Hey folks,

Hopefully not to make this KIP go for another spin :-), but I thought of a
modification that might actually address safety concerns over using flags
to ignore a vaguely specified class of errors.

What if we had the following overload of .send method:

  void send(ProducerRecord record, Callback callback, boolean
throwImmediately)

and if throwImmediately=false, then we behave the same way as now (return
errors via Future and poison transaction) and if throwImmediately=true then
we just throw errors immediately from the send function.  This way we don't
ignore the error, we're just changing the method they are delivered.  Then
KStreams can catch the error for send(record, callback, true) and do
whatever it needs to do.

-Artem


On Mon, Jul 15, 2024 at 4:30 PM Greg Harris <greg.har...@aiven.io.invalid>
wrote:

> Matthias,
>
> Thank you for rejecting my suggested alternatives. Your responses are the
> sorts of things I expected to see summarized in the text of the KIP.
>
> I agree with most of your rejections, except this one:
>
> > "Estimation" is not sufficient, but we would need to know it exactly.
> > And that's an impl detail, given that the message format could change
> > and we could add new internal fields increasing the message size.
>
> An estimate is certainly going to have an error. But an estimate shouldn't
> be treated as exact anyway, there should be an error bound, or "safety
> factor" used when interpreting it. For example, if the broker side limit is
> 1MB, and an estimate could be wrong by ~10%, then computing an estimate and
> dropping records >900kb should be sufficient to prevent RTLEs.
> This is the sort of estimation that I would expect application developers
> could implement, without knowing the exact serialization and protocol
> overhead. This could prevent user-originated oversize records from making
> it to the producer.
>
> Thanks,
> Greg
>
>
> On Mon, Jul 15, 2024 at 4:08 PM Matthias J. Sax <mj...@apache.org> wrote:
>
> > I agree with Alieh and Artem -- in the end, why buffer records twice? We
> > effectively want to allow to push some error handling (which I btw
> > consider "business logic") into the producer. IMHO, there is nothing
> > wrong with it. Dropping a poison pill record is no really a violation of
> > atomicity from my POV, but a business logic decision to not include a
> > record in a transaction -- the proposed API just makes it much simpler
> > to achieve this business logic goal.
> >
> >
> >
> > For memory size estimation, throughput or message size is actually not
> > relevant, right? We would need to look at producer buffer size, ie,
> > `batch.size`, `max.in.flight.request.per.connection` and guesstimate the
> > number of connections there might be? At least for KS, we don't need to
> > buffer everything until commit, but only until we get a successful "ack"
> > back.
> >
> > Note that KS application not only need to write to (a single) user
> > result topic, but multiple output topics, as well as repartition and
> > changelog topics, across all tasks assigned to a thread (ie, producer),
> > which can easily be 10 tasks or more. If we assume topics with 30
> > partitions (topics with 50 or more partitions are not uncommon either),
> > and a producer who must write to 10 different topics, the number of
> > required connections is very quickly very high, and thus the required
> > "application buffer space" would be significant.
> >
> >
> >
> > Your others ideas seems not to be viable alternatives:
> >
> > > Streams users that specifically want to drop oversize records can
> > > estimate the size of their data and drop records which are too
> > > large, enforcing their own limits that are lower than the Kafka limits.
> >
> > "Estimation" is not sufficient, but we would need to know it exactly.
> > And that's an impl detail, given that the message format could change
> > and we could add new internal fields increasing the message size. The
> > idea to add some `producer.serializedRecordSize()` helper method was
> > discussed, but it's a very ugly API and clumsy to use -- also, the user
> > code would need to know the producer config which it might not have
> > access to (as it might get passed in from some config file; and it might
> > also be changed).
> >
> > Some other alternative we also discussed was, to let `send()` throw an
> > exception for a "record too large" case directly. However, this solution
> > raises backward compatibly concerns, and it might also not help us to
> > extend the solution in the future (eg, tackle broker side errors). So we
> > discarded this idea.
> >
> >
> >
> > > Streams users that want CONTINUE semantics can use at_least_once
> > > semantics
> >
> > Not really. EOS is mainly about not having duplicates in the result, but
> > at-least-once cannot provide this guarantee. (Even if I repeat my self:
> > but dropping a poison pill record based on a business logic decision is
> > not data loss, but effectively a business logic filter...)
> >
> >
> >
> > > Streams itself can store record hashes/coordinates and fast rewind to
> > > the end of the last transaction, recomputing data rather than storing
> it.
> >
> > Given the very complex nature of topologies, with joins, aggregations,
> > flatmaps etc, this is a 100x more complex solution and not viable in
> > practice.
> >
> >
> >
> > > Streams can define exactly_once + CONTINUE semantics to permit the
> whole
> > > transaction to be dropped, because it would allow the next batch to be
> > > started processing.
> >
> > Would this not be much worse? I have a single poison pill record and
> > would need to drop a full tx (this could be tens of thousands of
> > records...). Also, given that KS write into changelog topic in the same
> > TX, this could break the whole application.
> >
> >
> >
> > > Streams can emit records with both a transactional and
> non-transactional
> > > producer if some records are not critical-path
> >
> > We (1) already have a "too many connections" problem with KS so using
> > move clients is something we try to avoid (and we actually hope to
> > reduce the number of client and connection mid to long term), (2) this
> > would be very hard to express at the API level to the user, and (3) it
> > would provide very weird semantics.
> >
> >
> >
> > > they should optimize for smaller transactions,
> >
> > IMHO, this would not work in practice because transaction have a high
> > overhead and commit-interval is used to tradeoff throughput vs
> > end-to-end latency. Given certain throughput requirement, it would not
> > be possible to just use a lower commit interval to reduce memory
> > requirements.
> >
> >
> >
> > -Matthias
> >
> >
> >
> >
> > On 7/15/24 2:25 PM, Artem Livshits wrote:
> > > Hi Greg,
> > >
> > >> This makes me think that this IGNORE_SEND_ERRORS covers an arbitrary
> set
> > > of error conditions that may be expanded in the future, possibly to
> cover
> > > the broker side RecordTooLargeException.
> > >
> > > I don't think it contradicts what I said (the keyword here is "in the
> > > future") -- with the current functionality, the correct way to handle
> > RTLE
> > > is by only letting the client ignore client-originated RTLE (this can
> be
> > > easily implemented on the client side).  In the future, we can improve
> on
> > > that by making the broker return a different exception for
> > batch-too-large
> > > case, then the producer would be able to return broker side exceptions
> as
> > > well (and if the application chooses to ignore it -- it will be able
> to,
> > > but it would be an explicit choice rather than ignoring it by mistake),
> > in
> > > this case the producer client would encapsulate backward compatibility
> > > logic when it connects to older brokers to make sure the the
> application
> > > doesn't accidentally gets RTLE originated by the old broker.  This
> > > functionality is obviously more involved and we'll need to see if going
> > all
> > > the way is justified, but the partial client-only solution doesn't
> close
> > > the door.
> > >
> > > So one way to look at the current situation is the following:
> > >
> > > 1. We can do a low effort partial solution to solve a real existing
> > > problem.  We can easily prove that it would do exactly what it needs to
> > do
> > > with minimal risk of regression.
> > > 2. We have a path to a more comprehensive solution, so if we justify
> the
> > > effort required for that, we can get there.
> > >
> > > BTW, as a side note (I think a saw a question in the thread), we do try
> > to
> > > introduce error categories here
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1050%3A+Consistent+error+handling+for+Transactions
> > > so eventually we may have a better classification for the errors.
> > >
> > >> "if a streams producer is producing 1MB/s, and the commit interval is
> 1
> > > hour, I expect 3600MB of additional heap needed ...
> > >
> > > Agree, that would be ideal.  On the other hand, the effort to prove
> that
> > > keeping all records in memory won't break some scenarios (and generally
> > > breaking one is enough to cause a lot of pain) seems to be
> significantly
> > > higher than to prove that setting some flag in some API has pretty
> much 0
> > > chance of regression (we basically have a flag to say "unfix
> KAFKA-9279"
> > so
> > > we're getting to fairly "known good" state).  I'll let KStream folks
> > > comment on this one (and we still need to solve the problem of
> accidental
> > > handling of RTLE originated from broker, so some KIP would be required
> to
> > > somehow help to differentiate those).
> > >
> > > -Artem
> > >
> > > On Mon, Jul 15, 2024 at 1:31 PM Greg Harris
> <greg.har...@aiven.io.invalid
> > >
> > > wrote:
> > >
> > >> Hi Artem,
> > >>
> > >> Thank you for clarifying as I'm joining the conversation late and may
> > have
> > >> some misconceptions.
> > >>
> > >>> Because of this, a more "complete" solution that
> > >>> allows ignoring RecordTooLargeException regardless of its origin is
> > >>> actually incorrect, while a "partial" solution that allows ignoring
> > >>> RecordTooLargeException only originating in client code accomplishes
> > the
> > >>> required functionality.
> > >>
> > >> This is not how I understood this feature. Above Matthias said the
> > >> following:
> > >>
> > >>> We can do
> > >>> follow up KIP for other errors on an on-demand basis and fix-forward
> /
> > >>> enlarge the scope successively.
> > >>
> > >> This makes me think that this IGNORE_SEND_ERRORS covers an arbitrary
> > set of
> > >> error conditions that may be expanded in the future, possibly to cover
> > the
> > >> broker side RecordTooLargeException.
> > >>
> > >>> Obviously, we could solve this problem by changing logic in the
> > >>> broker to return a different error when the batch is too large, but
> > right
> > >>> now this is not the case
> > >>
> > >> If the broker/wire protocol isn't ready for these errors to be
> > propagated,
> > >> then I don't think we're ready to add this API. It's going to be
> > >> under-generalized, and there's a decent chance that we're going to
> > regret
> > >> the design choices in the future. And users that expect it to be fully
> > >> generalized are going to be disappointed when they don't read the fine
> > >> print and still get faulted by non-covered errors.
> > >>
> > >>> AL2.  In a high performance system, "just an optimization" can be a
> > >>> functional requirement ...
> > >>>   I just wanted to make the point that we shouldn't necessarily
> dismiss
> > >>> API changes that allow for optimizations.
> > >>
> > >> My earlier statement didn't dismiss this feature as "just an
> > optimization",
> > >> actually the opposite. I said that performance could be a
> justification,
> > >> but only if it is quantified and stated explicitly. We shouldn't be
> > voting
> > >> on hand-wavy optimizations, we should be voting on things that are
> > >> quantifiable.
> > >> For example an analysis like the following would facilitate further
> > >> discussion: "if a streams producer is producing 1MB/s, and the commit
> > >> interval is 1 hour, I expect 3600MB of additional heap needed per
> > >> producer". We can then discuss whether we expect higher or lower
> > >> throughput, commit intervals, or heap usage to determine what the
> > operating
> > >> envelope of this feature could be.
> > >> If there are a substantial number of users that have high throughput,
> > long
> > >> commit intervals, _and_ RTLEs, then this feature could make sense. If
> > not,
> > >> then the downsides of this feature (complication of the API,
> > >> under-specification of the error coverage, etc) look unjustified. In
> > fact,
> > >> if the number of users regularly encountering RTLEs is sufficiently
> > small,
> > >> I would strongly advocate for an application-specific workaround
> > instead of
> > >> trying to fix this in Streams, or make memory buffering an optional
> > feature
> > >> in streams.
> > >>
> > >> Thanks,
> > >> Greg
> > >>
> > >> On Mon, Jul 15, 2024 at 1:29 PM Greg Harris <greg.har...@aiven.io>
> > wrote:
> > >>
> > >>> Hi Alieh,
> > >>>
> > >>> Thanks for your response.
> > >>>
> > >>>> what does a user do
> > >>>> after a transaction is failed due to a `too-large-record `exception?
> > >> They
> > >>>> will submit the same batch without the problematic record again.
> > >>>
> > >>> If they re-submit the same record, they are indicating that this
> record
> > >> is
> > >>> an integral part of the transaction, and the transaction should only
> be
> > >>> committed with it present. If the record isn't integral to the
> > >> transaction,
> > >>> they shouldn't submit it as part of the transaction.
> > >>>
> > >>>> Regarding your solution to solve the issue application-side:  I am a
> > >>>> bit hesitant to keep all sent records in memory since I think
> > buffering
> > >>>> records twice (both in Streams and Producer) would not be an
> efficient
> > >>>> solution.
> > >>>
> > >>> I understand your hesitation, and this touches on the "performance"
> > >> caveat
> > >>> of the end-to-end arguments in system design. There are no perfect
> > >> designs,
> > >>> and some API cleanliness may be sacrificed in favor of more
> performant
> > >>> solutions. You would need to make a concrete and convincing argument
> > that
> > >>> the performance of this solution would be better than every
> > alternative.
> > >> To
> > >>> that end, I would recommend that you add more to the "Rejected
> > >>> Alternatives" section, as that is going to carry this proposal.
> > >>> Some alternatives that I can think of, but which aren't necessarily
> > >> better:
> > >>> 1. Streams users that specifically want to drop oversize records can
> > >>> estimate the size of their data and drop records which are too
> > >>> large, enforcing their own limits that are lower than the Kafka
> limits.
> > >>> 2. Streams users that want CONTINUE semantics can use at_least_once
> > >>> semantics
> > >>> 3. Streams itself can store record hashes/coordinates and fast rewind
> > to
> > >>> the end of the last transaction, recomputing data rather than storing
> > it.
> > >>> 4. Streams can define exactly_once + CONTINUE semantics to permit the
> > >>> whole transaction to be dropped, because it would allow the next
> batch
> > to
> > >>> be started processing.
> > >>> 5. Streams can emit records with both a transactional and
> > >>> non-transactional producer if some records are not critical-path
> > >>>
> > >>> To generalize this point: Suppose an application tries to minimize
> > >> storage
> > >>> costs by having only one party responsible for a piece of data at a
> > time.
> > >>> They initially have the data, call send(), and want to know the
> > earliest
> > >>> time they can forget the data and transfer the responsibility to
> Kafka.
> > >>> With a non-transactional producer, they are responsible for the data
> > >> until
> > >>> the send() callback has succeeded. With a transactional producer,
> they
> > >> are
> > >>> responsible for the data until commitTransaction() has succeeded.
> > >>> With this proposed change that makes the producer tolerate
> > >>> too-large-exceptions, applications are still responsible for storing
> > >> their
> > >>> data until commitTransaction() has succeeded, because
> > abortTransaction()
> > >>> could have also been called, or the producer could have been fenced,
> or
> > >> any
> > >>> number of other failures could have occurred. This feature does not
> > >> enable
> > >>> Streams to drop responsibility earlier, it carves out a specific
> > >> situation
> > >>> in which it doesn't have to rewind processing, which is a performance
> > >>> concern.
> > >>>
> > >>> For Streams and the general case, if an application is trying to
> > optimize
> > >>> storage costs, they should optimize for smaller transactions, because
> > >> this
> > >>> both lowers the bound on record re-delivery and lowers the likelihood
> > of
> > >> a
> > >>> bad record being included in any individual transaction.
> > >>>
> > >>> Thanks,
> > >>> Greg
> > >>>
> > >>> On Mon, Jul 15, 2024 at 12:35 PM Artem Livshits
> > >>> <alivsh...@confluent.io.invalid> wrote:
> > >>>
> > >>>> Hi Greg,
> > >>>>
> > >>>> What you say makes a lot of sense.  I just wanted to clarify a
> couple
> > of
> > >>>> subtle points.
> > >>>>
> > >>>> AL1. There is a functional reason to handle errors that happen on
> send
> > >>>> (oginate in the producer logic in the client) vs. errors that are
> > >> returned
> > >>>> from the broker.  The problem is that RecordTooLargeException is
> > >> returned
> > >>>> in two cases: (1) the producer logic on the client checks that
> record
> > is
> > >>>> too large and throws the exception before doing anything with this
> --
> > >> this
> > >>>> is very "clean" situation with one specific record being marked as
> > >> "poison
> > >>>> pill" and rejected; (2) the broker throws the same error if the
> batch
> > is
> > >>>> too large -- the batch may include multiple records and none of them
> > >> would
> > >>>> necessarily be a "poison pill" record, it's just a random
> > >> misconfiguration
> > >>>> of client vs. broker.  Because of this, a more "complete" solution
> > that
> > >>>> allows ignoring RecordTooLargeException regardless of its origin is
> > >>>> actually incorrect, while a "partial" solution that allows ignoring
> > >>>> RecordTooLargeException only originating in client code accomplishes
> > the
> > >>>> required functionality.  This is an important nuance and should be
> > added
> > >>>> to
> > >>>> the KIP.  Obviously, we could solve this problem by changing logic
> in
> > >> the
> > >>>> broker to return a different error when the batch is too large, but
> > >> right
> > >>>> now this is not the case (and to have the correct error handling
> we'd
> > >> need
> > >>>> to know the version of the broker so we can only drop the records if
> > the
> > >>>> error is returned from a broker that knows to return a different
> > error).
> > >>>>
> > >>>> AL2.  In a high performance system, "just an optimization" can be a
> > >>>> functional requirement -- if a solution impacts memory or
> > computational
> > >>>> complexity (in the sense of bigO notation) on the main code path I
> can
> > >>>> justify changing APIs to avoid such an impact.  I'll let KStream
> folks
> > >>>> comment on whether an implementation that requires storing records
> in
> > >>>> memory actually violates the computational complexity on the main
> code
> > >>>> path, I just wanted to make the point that we shouldn't necessarily
> > >>>> dismiss
> > >>>> API changes that allow for optimizations.
> > >>>>
> > >>>> -Artem
> > >>>>
> > >>>> On Fri, Jul 12, 2024 at 1:07 PM Greg Harris
> > >> <greg.har...@aiven.io.invalid
> > >>>>>
> > >>>> wrote:
> > >>>>
> > >>>>> Hi all,
> > >>>>>
> > >>>>> Alieh, thanks for the KIP! And everyone else, thanks for the robust
> > >>>>> discussion.
> > >>>>>
> > >>>>> I understand that there are situations in which users desire that
> the
> > >>>>> pipeline "just keep working" and skip errors. However, I question
> > >>>> whether
> > >>>>> it is appropriate to support/encourage this behavior via inclusion
> in
> > >>>> the
> > >>>>> Producer API.
> > >>>>> This feature is essentially a "non-atomic transaction", as it
> allows
> > >>>>> commits in which not all records passed to send() ultimately get
> > >>>> committed.
> > >>>>> As atomicity is one of the most important semantics associated with
> > >>>>> transactions, I question whether there are users other than Streams
> > >> that
> > >>>>> would choose non-atomic transactions over a traditional/idempotent
> > >>>>> producer.
> > >>>>> Some cursory research shows that non-atomic transactions may be
> > >> present
> > >>>> in
> > >>>>> other databases, but is actively discouraged due to the complexity
> > >> they
> > >>>> add
> > >>>>> to error-handling. [1]
> > >>>>>
> > >>>>> I'd like to invoke the End-to-End Arguments in System Design [2]
> > here,
> > >>>> and
> > >>>>> recommend that this behavior may be present in Streams, but should
> > not
> > >>>> be
> > >>>>> in the Producer.
> > >>>>> 1. Dropping records that cause errors is already expressible via
> the
> > >>>>> current Producer API. You can store the records in-memory after
> > >> calling
> > >>>>> send(), wait for a successful no-error flush() before calling
> > >>>>> commitTransaction() and allowing the record to be garbage
> collected.
> > >> If
> > >>>>> errors occur, abortTransaction() and re-submit the records.
> > >>>>> 2. Implementing this inside the Producer API is complex and
> difficult
> > >> to
> > >>>>> holistically define in a way that we won't regret or need to change
> > >>>> later.
> > >>>>> I think some of the disagreement in this thread originates from
> this,
> > >>>> and I
> > >>>>> don't find the proposed API satisfactory.
> > >>>>> 3. The performance improvement of including this change in the
> lower
> > >>>> level
> > >>>>> needs to be quantified in order to be a justification, and I don't
> > see
> > >>>> any
> > >>>>> analysis about this.
> > >>>>>
> > >>>>> I imagine that the alternative implementation I suggested in (1)
> > would
> > >>>> also
> > >>>>> enable more expressive error handlers in Streams, if such a thing
> was
> > >>>>> desired. Keeping the record around until after the transaction is
> > >>>> committed
> > >>>>> would enable a DLQ or passing the erroneous record to the error
> > >> handler.
> > >>>>>
> > >>>>> I think that the current pattern of the application being
> responsible
> > >>>> for
> > >>>>> providing good data to the producer is very reasonable; Having the
> > >>>> producer
> > >>>>> responsible for implementing the application's error handling of
> bad
> > >>>> data
> > >>>>> is not something I can support.
> > >>>>>
> > >>>>> Thanks,
> > >>>>> Greg
> > >>>>>
> > >>>>> [1] https://www.sommarskog.se/error_handling/Part1.html
> > >>>>> [2]
> > >> https://web.mit.edu/Saltzer/www/publications/endtoend/endtoend.pdf
> > >>>>>
> > >>>>> On Fri, Jul 12, 2024 at 8:52 AM Justine Olshan
> > >>>>> <jols...@confluent.io.invalid>
> > >>>>> wrote:
> > >>>>>
> > >>>>>> Can we update the KIP to clearly document these decisions?
> > >>>>>>
> > >>>>>> Thanks,
> > >>>>>>
> > >>>>>> Justine
> > >>>>>>
> > >>>>>> On Tue, Jul 9, 2024 at 9:25 AM Andrew Schofield <
> > >>>>> andrew_schofi...@live.com
> > >>>>>>>
> > >>>>>> wrote:
> > >>>>>>
> > >>>>>>> Hi Chris,
> > >>>>>>> As it stands, the error handling for transactions in
> KafkaProducer
> > >>>> is
> > >>>>> not
> > >>>>>>> ideal. There’s no reason why a failed operation should fail a
> > >>>>> transaction
> > >>>>>>> provided that the application can tell that the operation was not
> > >>>>>> included
> > >>>>>>> in the transaction and then make its own decision whether to
> > >>>> continue
> > >>>>> or
> > >>>>>>> back out. So, I think I disagree with the original premise of a
> > >>>>>> client-side
> > >>>>>>> error state for a transaction, but we are where we are.
> > >>>>>>>
> > >>>>>>> When I voted, I did not expect the KIP to handle ALL errors which
> > >>>> could
> > >>>>>>> conceivably be handled. I did expect it to handle client-side
> send
> > >>>>> errors
> > >>>>>>> that would cause a record to be rejected from a batch before
> > >> sending
> > >>>>> to a
> > >>>>>>> broker. I think that it does make the KafkaProducer interface
> very
> > >>>>>> slightly
> > >>>>>>> more complicated, but the new option is a clear improvement and I
> > >>>>>>> don’t see anyone getting into a mess using it.
> > >>>>>>>
> > >>>>>>> I think broker-side errors are more tricky and I don’t think an
> > >>>>> overload
> > >>>>>>> on the send() method is going to do the job. I don’t see that as
> a
> > >>>>>> problem
> > >>>>>>> with the KIP, just that the underlying RPCs and behaviour is not
> > >>>> very
> > >>>>>>> amenable to record-specific error handling. The Produce RPC is a
> > >>>>>>> complicated beast which can include a set of records for mutiple
> > >>>>>>> topic-partitions. Although ProduceResponse v10 does include
> record
> > >>>>>>> errors, I don’t believe this is surfaced in the client. Let’s
> > >>>> imagine
> > >>>>>>> something
> > >>>>>>> like broker-side record validation which barfs on one record.
> > >>>> Failing
> > >>>>> an
> > >>>>>>> entire batch is easier, but less useful if the problem is related
> > >> to
> > >>>>> one
> > >>>>>>> record.
> > >>>>>>>
> > >>>>>>> In summary, I’m happy that my vote stands, and I am happy with
> the
> > >>>> KIP
> > >>>>>>> only supporting client-side record errors.
> > >>>>>>>
> > >>>>>>> Thanks,
> > >>>>>>> Andrew
> > >>>>>>>
> > >>>>>>>> On 8 Jul 2024, at 16:37, Chris Egerton <chr...@aiven.io.INVALID
> > >>>
> > >>>>>> wrote:
> > >>>>>>>>
> > >>>>>>>> Hi Alieh,
> > >>>>>>>>
> > >>>>>>>> Can you clarify why broker-side errors shouldn't be covered? The
> > >>>> only
> > >>>>>>> real
> > >>>>>>>> rationale I can come up with is that it's easier to implement.
> > >>>>>>>>
> > >>>>>>>> "Things were better for Kafka Streams before KAFKA-9279 was
> > >> fixed"
> > >>>>>> isn't
> > >>>>>>>> very convincing, because Kafka Streams is not the only user of
> > >> the
> > >>>>> Java
> > >>>>>>>> producer client. And for others, especially new users, I doubt
> > >>>> that
> > >>>>>> this
> > >>>>>>>> new API we're proposing would make sense without having to
> > >>>> consult a
> > >>>>>> lot
> > >>>>>>> of
> > >>>>>>>> historical context.
> > >>>>>>>>
> > >>>>>>>> I also don't think that most users will know or even care about
> > >>>> the
> > >>>>>>>> distinction between errors that cause a record to fail before
> > >> it's
> > >>>>>> added
> > >>>>>>> to
> > >>>>>>>> a batch vs. after. If you were writing a producer application of
> > >>>> your
> > >>>>>>> own,
> > >>>>>>>> and you wanted to handle RecordTooLargeException instances by
> > >>>>> dropping
> > >>>>>> a
> > >>>>>>>> record without aborting a transaction, would you care about
> > >>>> whether
> > >>>>> it
> > >>>>>>> was
> > >>>>>>>> your client or your broker that balked? Would you be happy if
> > >> you
> > >>>>> wrote
> > >>>>>>>> logic expecting that that problem was solved once and for all,
> > >>>> only
> > >>>>> to
> > >>>>>>>> learn that it could still affect you in other circumstances? Or,
> > >>>>>>>> alternatively, would you be happy if you wanted to solve that
> > >>>> problem
> > >>>>>> and
> > >>>>>>>> found an API that seemed to do exactly what you wanted, but
> > >> after
> > >>>>>> reading
> > >>>>>>>> the fine print, realized you'd have to do it yourself instead?
> > >>>>>>>>
> > >>>>>>>> Ultimately, the more I think about this, the more I believe that
> > >>>>> we're
> > >>>>>>>> adding noise to the API (with the new overloaded variant of
> > >> send)
> > >>>>> for a
> > >>>>>>>> feature that will likely bring confusion and even frustration to
> > >>>>> anyone
> > >>>>>>>> besides maintainers of Kafka Streams who tries to use it.
> > >>>>>>>>
> > >>>>>>>> If the only concern about covering broker-side errors is that it
> > >>>>> would
> > >>>>>> be
> > >>>>>>>> more difficult to implement, I believe we should strongly
> > >>>> reconsider
> > >>>>>> that
> > >>>>>>>> alternative. That said, if there is a straightforward way to
> > >>>> explain
> > >>>>>> this
> > >>>>>>>> feature to new users that won't mislead them or require them to
> > >> do
> > >>>>>>> research
> > >>>>>>>> on producer internals, then I can still live with it.
> > >>>>>>>>
> > >>>>>>>> Regarding a list of recoverable vs. irrecoverable errors, this
> > >> is
> > >>>>>>> actually
> > >>>>>>>> the subject of another recently-introduced KIP:
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1050%3A+Consistent+error+handling+for+Transactions
> > >>>>>>>>
> > >>>>>>>> Finally, I'd also like to ask the people who have already voted
> > >>>>>> (Andrew,
> > >>>>>>>> Matthias) if, at the time they voted, they believed that the API
> > >>>>> would
> > >>>>>>>> handle all errors, or only the subset of errors that would
> > >> cause a
> > >>>>>> record
> > >>>>>>>> to be rejected from a batch before it can be sent to a broker.
> > >>>>>>>>
> > >>>>>>>> Best,
> > >>>>>>>>
> > >>>>>>>> Chris
> > >>>>>>>>
> > >>>>>>>> On Thu, Jul 4, 2024 at 12:43 PM Alieh Saeedi
> > >>>>>>> <asae...@confluent.io.invalid>
> > >>>>>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> Salut from the KIP’s author
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> Clarifying two points:
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> 1) broker side errors:
> > >>>>>>>>>
> > >>>>>>>>> As far as I remember we are not going to cover the errors
> > >>>>> originating
> > >>>>>>> from
> > >>>>>>>>> the broker!
> > >>>>>>>>>
> > >>>>>>>>> A historical fact: One of the debate points in KIP-1038 was
> > >> that
> > >>>> by
> > >>>>>>>>> defining a producer custom handler, the user may assume that
> > >>>>>> broker-side
> > >>>>>>>>> errors must be covered as well. They may define a handler for
> > >>>>> handling
> > >>>>>>>>> `RecordTooLargeException` and still see such errors not being
> > >>>>> handled
> > >>>>>> as
> > >>>>>>>>> they wish.
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> 2) Regarding irrecoverable/recoverable errors:
> > >>>>>>>>>
> > >>>>>>>>> Before the fix of `KAFKA-9279`,  errors such as
> > >>>>>>> `RecordTooLargeException`
> > >>>>>>>>> or errors related to missing meta data (both originating from
> > >>>>> Producer
> > >>>>>>>>> `send()`) were considered as recoverable but after that they
> > >>>> turned
> > >>>>>> into
> > >>>>>>>>> being irrecoverable without changing any Javadocs or having any
> > >>>> KIP.
> > >>>>>>> All
> > >>>>>>>>> the effort made in this KIP and the former one have been
> > >> towards
> > >>>>>>> returning
> > >>>>>>>>> to the former state.
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> I am sure that it is clear for you that which sort of errors we
> > >>>> are
> > >>>>>>> going
> > >>>>>>>>> to cover: A single record may happen to NOT get added to the
> > >>>> batch
> > >>>>> due
> > >>>>>>> to
> > >>>>>>>>> the issues with the record or its corresponding topic. The
> > >> point
> > >>>> was
> > >>>>>>> that
> > >>>>>>>>> if the record is not added to the batch let ’s don’t fail the
> > >>>> whole
> > >>>>>>> batch
> > >>>>>>>>> because of that non-existing record. We never intended to do
> > >> sth
> > >>>> in
> > >>>>>>> broker
> > >>>>>>>>> side or ignore more important errors.  But I agree with you
> > >>>> Chris.
> > >>>>> If
> > >>>>>> we
> > >>>>>>>>> are adding a new API, we must have good documentation for that.
> > >>>> The
> > >>>>>>>>> sentence `all irrecoverable transactional errors will still be
> > >>>>> fatal`
> > >>>>>> as
> > >>>>>>>>> you suggested is good. What do you think? I am totally against
> > >>>>>>> enumerating
> > >>>>>>>>> errors in Javadocs since these sort of errors can be changing
> > >>>> during
> > >>>>>>>>> time.  More
> > >>>>>>>>> over, have you ever seen any list of recoverable or
> > >> irrecoverable
> > >>>>>> errors
> > >>>>>>>>> somewhere so far?
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> Bests,
> > >>>>>>>>>
> > >>>>>>>>> Alieh
> > >>>>>>>>>
> > >>>>>>>>> On Wed, Jul 3, 2024 at 6:07 PM Chris Egerton
> > >>>>> <chr...@aiven.io.invalid
> > >>>>>>>
> > >>>>>>>>> wrote:
> > >>>>>>>>>
> > >>>>>>>>>> Hi Justine,
> > >>>>>>>>>>
> > >>>>>>>>>> I agree that enumerating a list of errors that should be
> > >>>> covered by
> > >>>>>> the
> > >>>>>>>>> KIP
> > >>>>>>>>>> is difficult; I was thinking it might be easier if we list the
> > >>>>> errors
> > >>>>>>>>> that
> > >>>>>>>>>> should _not_ be covered by the KIP, and only if we can't
> > >> define
> > >>>> a
> > >>>>>>>>>> reasonable heuristic that would cover them without having to
> > >>>>>> explicitly
> > >>>>>>>>>> list them. Could it be enough to say "all irrecoverable
> > >>>>> transactional
> > >>>>>>>>>> errors will still be fatal", or even just "all transactional
> > >>>> errors
> > >>>>>> (as
> > >>>>>>>>>> opposed to errors related to this specific record) will still
> > >> be
> > >>>>>>> fatal"?
> > >>>>>>>>>>
> > >>>>>>>>>> Cheers,
> > >>>>>>>>>>
> > >>>>>>>>>> Chris
> > >>>>>>>>>>
> > >>>>>>>>>> On Wed, Jul 3, 2024 at 11:56 AM Justine Olshan
> > >>>>>>>>>> <jols...@confluent.io.invalid>
> > >>>>>>>>>> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>>> Hey Chris,
> > >>>>>>>>>>>
> > >>>>>>>>>>> I think what you say makes sense. I agree that defining the
> > >>>>> behavior
> > >>>>>>>>>> based
> > >>>>>>>>>>> on code that can possibly change is not a good idea, and I
> > >> was
> > >>>>>> trying
> > >>>>>>>>> to
> > >>>>>>>>>>> get a clearer definition from the KIP's author :)
> > >>>>>>>>>>>
> > >>>>>>>>>>> I think it can always be hard to ensure that only specific
> > >>>> errors
> > >>>>>> are
> > >>>>>>>>>>> handled unless they are explicitly enumerated in code as the
> > >>>> code
> > >>>>>> can
> > >>>>>>>>>>> change and can be changed by folks who are not aware of this
> > >>>> KIP
> > >>>>> or
> > >>>>>>>>>>> conversation.
> > >>>>>>>>>>> I personally don't have the bandwidth to do this
> > >>>>>>> definition/enumeration
> > >>>>>>>>>> of
> > >>>>>>>>>>> errors, so hopefully Alieh can expand upon this.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Justine
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Wed, Jul 3, 2024 at 8:28 AM Chris Egerton
> > >>>>>> <chr...@aiven.io.invalid
> > >>>>>>>>
> > >>>>>>>>>>> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>>> Hi Alieh,
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> I don't love defining the changes for this KIP in terms of a
> > >>>>> catch
> > >>>>>>>>>> clause
> > >>>>>>>>>>>> in the KafkaProducer class, for two reasons. First, the set
> > >> of
> > >>>>>> errors
> > >>>>>>>>>>> that
> > >>>>>>>>>>>> are handled by that clause may shift over time as the code
> > >>>> base
> > >>>>> is
> > >>>>>>>>>>>> modified, and second, it would be fairly opaque to users who
> > >>>> want
> > >>>>>> to
> > >>>>>>>>>>>> understand whether an error would be affected by using this
> > >>>> API
> > >>>>> or
> > >>>>>>>>> not.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> It also seems strange that we'd handle some types of
> > >>>>>>>>>>>> RecordTooLargeException (i.e., ones reported client-side)
> > >> with
> > >>>>> this
> > >>>>>>>>>> API,
> > >>>>>>>>>>>> but not others (i.e., ones reported by a broker).
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> I think this kind of API would be most powerful, most
> > >>>> intuitive
> > >>>>> to
> > >>>>>>>>>> users,
> > >>>>>>>>>>>> and easiest to document if we expanded the scope to all
> > >>>>>>>>>>> record-send-related
> > >>>>>>>>>>>> errors, except anything indicating issues with exactly-once
> > >>>>>>>>> semantics.
> > >>>>>>>>>>> That
> > >>>>>>>>>>>> would include records that are too large (when caught both
> > >>>>> client-
> > >>>>>>>>> and
> > >>>>>>>>>>>> server-side), records that can't be sent due to
> > >> authorization
> > >>>>>>>>> failures,
> > >>>>>>>>>>>> records sent to nonexistent topics/topic partitions, and
> > >>>> keyless
> > >>>>>>>>>> records
> > >>>>>>>>>>>> sent to compacted topics. It would not include
> > >>>>>>>>>>>> ProducerFencedException, InvalidProducerEpochException,
> > >>>>>>>>>>>> UnsupportedVersionException,
> > >>>>>>>>>>>> and possibly others.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> @Justine -- do you think it would be possible to develop
> > >>>> either a
> > >>>>>>>>>> better
> > >>>>>>>>>>>> definition for the kinds of "excluded" errors that should
> > >> not
> > >>>> be
> > >>>>>>>>>> covered
> > >>>>>>>>>>> by
> > >>>>>>>>>>>> this API, or, barring that, a comprehensive list of exact
> > >>>> error
> > >>>>>>>>> types?
> > >>>>>>>>>>> And
> > >>>>>>>>>>>> do you think this would be acceptable in terms of risk and
> > >>>>>>>>> complexity?
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Chris
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> On Tue, Jul 2, 2024 at 5:05 PM Alieh Saeedi
> > >>>>>>>>>> <asae...@confluent.io.invalid
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> Hey Justine,
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> About the consequences: the consequences will be like when
> > >> we
> > >>>>> did
> > >>>>>>>>> not
> > >>>>>>>>>>>> have
> > >>>>>>>>>>>>> the fix made in `KAFKA-9279`: silent loss of data!
> > >> Obviously,
> > >>>>> when
> > >>>>>>>>>> the
> > >>>>>>>>>>>> user
> > >>>>>>>>>>>>> intentionally chose to ignore errors, that would not be
> > >>>> silent
> > >>>>> any
> > >>>>>>>>>>> more.
> > >>>>>>>>>>>>> Right?
> > >>>>>>>>>>>>> Of course, considering all types of `ApiException`s would
> > >> be
> > >>>> too
> > >>>>>>>>>> broad.
> > >>>>>>>>>>>> But
> > >>>>>>>>>>>>> are the exceptions caught in `catch(ApiException e)` of the
> > >>>>>>>>>> `doSend()`
> > >>>>>>>>>>>>> method also too broad?
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> -Alieh
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> On Tue, Jul 2, 2024 at 9:45 PM Justine Olshan
> > >>>>>>>>>>>> <jols...@confluent.io.invalid
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Hey Alieh,
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> If we want to allow any error to be ignored we should
> > >>>> probably
> > >>>>>>>>> run
> > >>>>>>>>>>>>> through
> > >>>>>>>>>>>>>> all the errors to make sure they make sense.
> > >>>>>>>>>>>>>> I just want to feel confident that we aren't just making a
> > >>>>>>>>> decision
> > >>>>>>>>>>>>> without
> > >>>>>>>>>>>>>> considering the consequences carefully.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Justine
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> On Tue, Jul 2, 2024 at 12:30 PM Alieh Saeedi
> > >>>>>>>>>>>>> <asae...@confluent.io.invalid
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Hey Justine,
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> yes we talked about `RecordTooLargeException` as an
> > >>>> example,
> > >>>>>>>>> but
> > >>>>>>>>>>> did
> > >>>>>>>>>>>> we
> > >>>>>>>>>>>>>>> ever limit ourselves to only this specific exception? I
> > >>>> think
> > >>>>>>>>>>> neither
> > >>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>> the KIP nor in the PR.  As Chris mentioned, this KIP is
> > >>>> going
> > >>>>>>>>> to
> > >>>>>>>>>>> undo
> > >>>>>>>>>>>>>> what
> > >>>>>>>>>>>>>>> we have done in `KAFKA-9279` in case 1) the user is in a
> > >>>>>>>>>>> transaction
> > >>>>>>>>>>>>> and
> > >>>>>>>>>>>>>> 2)
> > >>>>>>>>>>>>>>> he decides to ignore the errors in which the record was
> > >> not
> > >>>>>>>>> even
> > >>>>>>>>>>>> added
> > >>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>> the batch. Yes, and we suggested some methods for undoing
> > >>>> or,
> > >>>>>>>>> in
> > >>>>>>>>>>>> fact,
> > >>>>>>>>>>>>>>> moving back the transaction from the error state in
> > >>>> `flush` or
> > >>>>>>>>> in
> > >>>>>>>>>>>>>>> `commitTnx` and we finally came to the idea of not even
> > >>>> doing
> > >>>>>>>>> the
> > >>>>>>>>>>>>> changes
> > >>>>>>>>>>>>>>> (better than undoing) in `send`.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Bests,
> > >>>>>>>>>>>>>>> Alieh
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> On Tue, Jul 2, 2024 at 8:03 PM Justine Olshan
> > >>>>>>>>>>>>>> <jols...@confluent.io.invalid
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Hey folks,
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> I understand where you are coming from by asking for
> > >>>> specific
> > >>>>>>>>>> use
> > >>>>>>>>>>>>>> cases.
> > >>>>>>>>>>>>>>> My
> > >>>>>>>>>>>>>>>> understanding based on previous conversations was that
> > >>>> there
> > >>>>>>>>>>> were a
> > >>>>>>>>>>>>> few
> > >>>>>>>>>>>>>>>> different errors that have been seen.
> > >>>>>>>>>>>>>>>> One example I heard some information about was when the
> > >>>>>>>>> record
> > >>>>>>>>>>> was
> > >>>>>>>>>>>>> too
> > >>>>>>>>>>>>>>>> large and it fails the batch. Besides that, I'm not
> > >> really
> > >>>>>>>>> sure
> > >>>>>>>>>>> if
> > >>>>>>>>>>>>>> there
> > >>>>>>>>>>>>>>>> are cases in mind, though it is fair to ask on those and
> > >>>>>>>>> bring
> > >>>>>>>>>>> them
> > >>>>>>>>>>>>> up.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Does a record qualify as a poison pill if it targets a
> > >>>>>>>>> topic
> > >>>>>>>>>>> that
> > >>>>>>>>>>>>>>>> doesn't exist? Or if it targets a topic that the
> > >> producer
> > >>>>>>>>>>> principal
> > >>>>>>>>>>>>>> lacks
> > >>>>>>>>>>>>>>>> ACLs for? What if it fails broker-side validation (e.g.,
> > >>>> has
> > >>>>>>>>> a
> > >>>>>>>>>>> null
> > >>>>>>>>>>>>> key
> > >>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>> a compacted topic)?
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> I think there was some parallel work with addressing the
> > >>>>>>>>>>>>>>>> UnknownTopicOrPartitionError in another way. As for the
> > >>>> other
> > >>>>>>>>>>>> checks,
> > >>>>>>>>>>>>>>> acls,
> > >>>>>>>>>>>>>>>> validation etc. I am not aware of that being in Alieh's
> > >>>>>>>>> scope,
> > >>>>>>>>>>> but
> > >>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>> should be clear about exactly what we are doing.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> All errors that fall into ApiException seems too broad
> > >> to
> > >>>> me.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Justine
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> On Tue, Jul 2, 2024 at 10:51 AM Alieh Saeedi
> > >>>>>>>>>>>>>>> <asae...@confluent.io.invalid
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Hey Chris,
> > >>>>>>>>>>>>>>>>> thanks for sharing your concerns.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> 1) About the language of KIP (or maybe later in
> > >>>> Javadocs):
> > >>>>>>>>> Is
> > >>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>> alright
> > >>>>>>>>>>>>>>>>> if I write all errors that fall into the `ApiException`
> > >>>>>>>>>>> category
> > >>>>>>>>>>>>>> thrown
> > >>>>>>>>>>>>>>>>> (actually returned) by Producer?
> > >>>>>>>>>>>>>>>>> 2) About future expansion: do you have any better
> > >>>>>>>>> suggestions
> > >>>>>>>>>>> for
> > >>>>>>>>>>>>>> enum
> > >>>>>>>>>>>>>>>>> names? Do you think `IGNORE_API_EXEPTIONS` or something
> > >>>>>>>>> like
> > >>>>>>>>>>> that
> > >>>>>>>>>>>>> is
> > >>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>> "better/more accurate" one?
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Bests,
> > >>>>>>>>>>>>>>>>> Alieh
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> On Tue, Jul 2, 2024 at 7:29 PM Chris Egerton
> > >>>>>>>>>>>>> <chr...@aiven.io.invalid
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Hi Alieh and Justine,
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> I'm concerned that we're settling on a definition of
> > >>>>>>>>>> "poison
> > >>>>>>>>>>>>> pill"
> > >>>>>>>>>>>>>>>> that's
> > >>>>>>>>>>>>>>>>>> easiest to tackle right now but may lead to
> > >> shortcomings
> > >>>>>>>>>> down
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>> road. I
> > >>>>>>>>>>>>>>>>>> understand the relationship between this KIP and
> > >>>>>>>>>> KAFKA-9279,
> > >>>>>>>>>>>> and
> > >>>>>>>>>>>>> I
> > >>>>>>>>>>>>>>> can
> > >>>>>>>>>>>>>>>>>> totally get behind the desire to keep things small,
> > >>>>>>>>>> focused,
> > >>>>>>>>>>>> and
> > >>>>>>>>>>>>>>> simple
> > >>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>> the name of avoiding bugs. However, what I don't think
> > >>>> is
> > >>>>>>>>>>> clear
> > >>>>>>>>>>>>> at
> > >>>>>>>>>>>>>>> all
> > >>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>> what the "specific circumstances" are that Justine
> > >>>>>>>>>>> mentioned. I
> > >>>>>>>>>>>>>> had a
> > >>>>>>>>>>>>>>>>>> drastically different idea of what the intended
> > >>>>>>>>> behavioral
> > >>>>>>>>>>>> change
> > >>>>>>>>>>>>>>> would
> > >>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>> before looking at the draft PR.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> I would like 1) for us to be clearer about the
> > >>>> categories
> > >>>>>>>>>> of
> > >>>>>>>>>>>>> errors
> > >>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>> want to cover with this new API (especially since
> > >> we'll
> > >>>>>>>>>> have
> > >>>>>>>>>>> to
> > >>>>>>>>>>>>>> find
> > >>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>> clear, succinct way to document this for users), and
> > >> 2)
> > >>>>>>>>> to
> > >>>>>>>>>>> make
> > >>>>>>>>>>>>>> sure
> > >>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>> if we do try to expand this API in the future, that we
> > >>>>>>>>>> won't
> > >>>>>>>>>>> be
> > >>>>>>>>>>>>>>> painted
> > >>>>>>>>>>>>>>>>>> into a corner.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> For item 1, hopefully we can agree that the language
> > >> in
> > >>>>>>>>> the
> > >>>>>>>>>>> KIP
> > >>>>>>>>>>>>>>>>>> for IGNORE_SEND_ERRORS ("The records causing
> > >>>>>>>>> irrecoverable
> > >>>>>>>>>>>> errors
> > >>>>>>>>>>>>>> are
> > >>>>>>>>>>>>>>>>>> excluded from the batch and the transaction is
> > >> committed
> > >>>>>>>>>>>>>>>> successfully.")
> > >>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>> pretty vague. If we start using the phrase "poison
> > >> pill
> > >>>>>>>>>>> record"
> > >>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>> could
> > >>>>>>>>>>>>>>>>>> help, but IMO more detail would still be needed. We
> > >> know
> > >>>>>>>>>> that
> > >>>>>>>>>>>> we
> > >>>>>>>>>>>>>> want
> > >>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>> include records that are so large that they can be
> > >>>>>>>>>>> immediately
> > >>>>>>>>>>>>>>> rejected
> > >>>>>>>>>>>>>>>>> by
> > >>>>>>>>>>>>>>>>>> the producer. But there are other cases that users
> > >> might
> > >>>>>>>>>>> expect
> > >>>>>>>>>>>>> to
> > >>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>> handled. Does a record qualify as a poison pill if it
> > >>>>>>>>>>> targets a
> > >>>>>>>>>>>>>> topic
> > >>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>> doesn't exist? Or if it targets a topic that the
> > >>>> producer
> > >>>>>>>>>>>>> principal
> > >>>>>>>>>>>>>>>> lacks
> > >>>>>>>>>>>>>>>>>> ACLs for? What if it fails broker-side validation
> > >> (e.g.,
> > >>>>>>>>>> has
> > >>>>>>>>>>> a
> > >>>>>>>>>>>>> null
> > >>>>>>>>>>>>>>> key
> > >>>>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>> a compacted topic)?
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> For item 2, this really depends on how narrow the
> > >> scope
> > >>>>>>>>> of
> > >>>>>>>>>>> what
> > >>>>>>>>>>>>>> we're
> > >>>>>>>>>>>>>>>>> doing
> > >>>>>>>>>>>>>>>>>> right now is. If we only handle a subset of the
> > >> examples
> > >>>>>>>>> I
> > >>>>>>>>>>> laid
> > >>>>>>>>>>>>> out
> > >>>>>>>>>>>>>>>> above
> > >>>>>>>>>>>>>>>>>> that could possibly be considered poison pills with
> > >> this
> > >>>>>>>>>> KIP,
> > >>>>>>>>>>>> do
> > >>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>> want
> > >>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>> lock ourselves in to never addressing more in the
> > >>>> future,
> > >>>>>>>>>> or
> > >>>>>>>>>>>> can
> > >>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>> choose
> > >>>>>>>>>>>>>>>>>> an API (probably just enum names would be the only
> > >>>>>>>>>> important
> > >>>>>>>>>>>>>> decision
> > >>>>>>>>>>>>>>>>> here)
> > >>>>>>>>>>>>>>>>>> that leaves room for more later?
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Chris
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> On Tue, Jul 2, 2024 at 12:28 PM Justine Olshan
> > >>>>>>>>>>>>>>>>>> <jols...@confluent.io.invalid>
> > >>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Chris and Alieh,
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> My understanding is that this KIP is really only
> > >> trying
> > >>>>>>>>>> to
> > >>>>>>>>>>>>> solve
> > >>>>>>>>>>>>>> an
> > >>>>>>>>>>>>>>>>> issue
> > >>>>>>>>>>>>>>>>>>> of a "poison pill" record that fails send().
> > >>>>>>>>>>>>>>>>>>> We've talked a lot about having a generic framework
> > >> for
> > >>>>>>>>>> all
> > >>>>>>>>>>>>>> errors,
> > >>>>>>>>>>>>>>>>> but I
> > >>>>>>>>>>>>>>>>>>> don't think that is what this KIP is trying to do.
> > >>>>>>>>>>>> Essentially
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>> request
> > >>>>>>>>>>>>>>>>>>> is to undo the change from KAFKA-9279
> > >>>>>>>>>>>>>>>>>>> <https://issues.apache.org/jira/browse/KAFKA-9279>
> > >> but
> > >>>>>>>>>>> under
> > >>>>>>>>>>>>>>>> specific
> > >>>>>>>>>>>>>>>>>>> circumstances that are controlled. I really am
> > >>>>>>>>> concerned
> > >>>>>>>>>>>> about
> > >>>>>>>>>>>>>>>> opening
> > >>>>>>>>>>>>>>>>>> new
> > >>>>>>>>>>>>>>>>>>> avenues for bugs with EOS and hesitate to handle any
> > >>>>>>>>>> other
> > >>>>>>>>>>>>> types
> > >>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>> errors.
> > >>>>>>>>>>>>>>>>>>> I think if we all agree on the problem that we are
> > >>>>>>>>> trying
> > >>>>>>>>>>> to
> > >>>>>>>>>>>>>> solve,
> > >>>>>>>>>>>>>>>> it
> > >>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>> easier to agree on solutions.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Justine
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> On Mon, Jul 1, 2024 at 2:20 AM Alieh Saeedi
> > >>>>>>>>>>>>>>>>> <asae...@confluent.io.invalid
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Hi Matthias,
> > >>>>>>>>>>>>>>>>>>>> Thanks for the valid points you mentioned. I updated
> > >>>>>>>>>> the
> > >>>>>>>>>>>> KIP
> > >>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>> PR
> > >>>>>>>>>>>>>>>>>>>> with:
> > >>>>>>>>>>>>>>>>>>>> 1) mentioning that the new overloaded `send` throws
> > >>>>>>>>>>>>>>>>>>> `IllegalStateException`
> > >>>>>>>>>>>>>>>>>>>> if the user tries to ignore `send()` errors outside
> > >>>>>>>>> of
> > >>>>>>>>>> a
> > >>>>>>>>>>>>>>>> transaction.
> > >>>>>>>>>>>>>>>>>>>> 2) the default implementation in `Producer`
> > >> interface
> > >>>>>>>>>>>> throws
> > >>>>>>>>>>>>> an
> > >>>>>>>>>>>>>>>>>>>> `UnsupportedOperationException`
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Hi Chris,
> > >>>>>>>>>>>>>>>>>>>> Thanks for the feedback. I tried to clarify the
> > >>>>>>>>> points
> > >>>>>>>>>>> you
> > >>>>>>>>>>>>>>> listed:
> > >>>>>>>>>>>>>>>>>>>> -------> we've narrowed the scope from any error
> > >> that
> > >>>>>>>>>>> might
> > >>>>>>>>>>>>>> take
> > >>>>>>>>>>>>>>>>> place
> > >>>>>>>>>>>>>>>>>>> with
> > >>>>>>>>>>>>>>>>>>>> producing a record to Kafka, to only the ones that
> > >>>>>>>>> are
> > >>>>>>>>>>>> thrown
> > >>>>>>>>>>>>>>>>> directly
> > >>>>>>>>>>>>>>>>>>> from
> > >>>>>>>>>>>>>>>>>>>> Producer::send;
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>  From the very beginning and even since KIP-1038,
> the
> > >>>>>>>>>> main
> > >>>>>>>>>>>>>> purpose
> > >>>>>>>>>>>>>>>> was
> > >>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>> have "more flexibility," or, in other words, "giving
> > >>>>>>>>>> the
> > >>>>>>>>>>>> user
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>> authority" to handle some specific exceptions thrown
> > >>>>>>>>>> from
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>> `Producer`.
> > >>>>>>>>>>>>>>>>>>>> Due to the specific cases we had in mind, KIP-1038
> > >>>>>>>>> was
> > >>>>>>>>>>>>>> discarded
> > >>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>> decided to not define a `CustomExceptionHandler` for
> > >>>>>>>>>>>>> `Producer`
> > >>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>> instead
> > >>>>>>>>>>>>>>>>>>>> treat the `send` failures in a different way. The
> > >>>>>>>>> main
> > >>>>>>>>>>>> issue
> > >>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>> `send`
> > >>>>>>>>>>>>>>>>>>>> makes a transition to error state, which is
> > >> undoable.
> > >>>>>>>>>> In
> > >>>>>>>>>>>>> fact,
> > >>>>>>>>>>>>>>> one
> > >>>>>>>>>>>>>>>>>> single
> > >>>>>>>>>>>>>>>>>>>> poison pill record makes the whole batch fail. The
> > >>>>>>>>>> former
> > >>>>>>>>>>>>>>>> suggestions
> > >>>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>> you agreed with have been all about un-doing this
> > >>>>>>>>>>>> transition
> > >>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>> `flush`
> > >>>>>>>>>>>>>>>>>>> or
> > >>>>>>>>>>>>>>>>>>>> `commit`. The new suggestion is to un-do (or better,
> > >>>>>>>>>> NOT
> > >>>>>>>>>>>> do)
> > >>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>> `send`
> > >>>>>>>>>>>>>>>>>>> due
> > >>>>>>>>>>>>>>>>>>>> to the reasons listed in the discussions above.
> > >>>>>>>>>>>>>>>>>>>> Moreover, I would say that having such a large scope
> > >>>>>>>>> as
> > >>>>>>>>>>> you
> > >>>>>>>>>>>>>>>> mentioned
> > >>>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>> impossible. In the best case, we may have control
> > >>>>>>>>> over
> > >>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>> `Producer`.
> > >>>>>>>>>>>>>>>>>>> What
> > >>>>>>>>>>>>>>>>>>>> shall we do with the broker? The `any error that
> > >>>>>>>>> might
> > >>>>>>>>>>> take
> > >>>>>>>>>>>>>> place
> > >>>>>>>>>>>>>>>>> with
> > >>>>>>>>>>>>>>>>>>>> producing a record to Kafka` is too much, I think.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> -------> is this all we want to handle, and will it
> > >>>>>>>>>>> prevent
> > >>>>>>>>>>>>> us
> > >>>>>>>>>>>>>>> from
> > >>>>>>>>>>>>>>>>>>>> handling more in the future in an intuitive way?
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> I think yes. This is all we want. Other sorts of
> > >>>>>>>>> errors
> > >>>>>>>>>>>> such
> > >>>>>>>>>>>>> as
> > >>>>>>>>>>>>>>>>> having
> > >>>>>>>>>>>>>>>>>>>> problem with partition addition, producer fenced
> > >>>>>>>>>>> exception,
> > >>>>>>>>>>>>> etc
> > >>>>>>>>>>>>>>>> seem
> > >>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>> more serious issues. The intention was to handle
> > >>>>>>>>>> problems
> > >>>>>>>>>>>>>> created
> > >>>>>>>>>>>>>>>> by
> > >>>>>>>>>>>>>>>>>>>> (maybe) a single poison pill record. BTW, I do not
> > >>>>>>>>> see
> > >>>>>>>>>>> any
> > >>>>>>>>>>>>>>>> obstacles
> > >>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>> future changes.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Bests,
> > >>>>>>>>>>>>>>>>>>>> Alieh
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> On Sat, Jun 29, 2024 at 3:03 AM Chris Egerton
> > >>>>>>>>>>>>>>>>> <chr...@aiven.io.invalid
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Ah, sorry--spoke too soon. The PR doesn't show that
> > >>>>>>>>>>>> errors
> > >>>>>>>>>>>>>>> thrown
> > >>>>>>>>>>>>>>>>>> from
> > >>>>>>>>>>>>>>>>>>>>> Producer::send are handled, but instead,
> > >>>>>>>>> ApiException
> > >>>>>>>>>>>>>> instances
> > >>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>> are
> > >>>>>>>>>>>>>>>>>>>>> caught inside KafkaProducer::doSend and are handled
> > >>>>>>>>>> by
> > >>>>>>>>>>>>>>> returning
> > >>>>>>>>>>>>>>>> an
> > >>>>>>>>>>>>>>>>>>>>> already-failed future are. I think the same
> > >>>>>>>>> question
> > >>>>>>>>>>>> still
> > >>>>>>>>>>>>>>>> applies
> > >>>>>>>>>>>>>>>>>> (is
> > >>>>>>>>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>>>>>>> all we want to handle, and will it prevent us from
> > >>>>>>>>>>>> handling
> > >>>>>>>>>>>>>>> more
> > >>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>> future in an intuitive way), though.
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> On Fri, Jun 28, 2024 at 8:57 PM Chris Egerton <
> > >>>>>>>>>>>>>> chr...@aiven.io
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> Hi Alieh,
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> This KIP has evolved a lot since I last looked at
> > >>>>>>>>>> it,
> > >>>>>>>>>>>> but
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>> changes
> > >>>>>>>>>>>>>>>>>>>>> seem
> > >>>>>>>>>>>>>>>>>>>>>> well thought-out both in semantics and API. One
> > >>>>>>>>>>>>> clarifying
> > >>>>>>>>>>>>>>>>>> question I
> > >>>>>>>>>>>>>>>>>>>>> have
> > >>>>>>>>>>>>>>>>>>>>>> is that it looks based on the draft PR that we've
> > >>>>>>>>>>>>> narrowed
> > >>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>> scope
> > >>>>>>>>>>>>>>>>>>>> from
> > >>>>>>>>>>>>>>>>>>>>>> any error that might take place with producing a
> > >>>>>>>>>>> record
> > >>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>> Kafka,
> > >>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>> only
> > >>>>>>>>>>>>>>>>>>>>>> the ones that are thrown directly from
> > >>>>>>>>>>> Producer::send;
> > >>>>>>>>>>>> is
> > >>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>> intended
> > >>>>>>>>>>>>>>>>>>>>>> behavior here? And if so, do you have thoughts on
> > >>>>>>>>>> how
> > >>>>>>>>>>>> we
> > >>>>>>>>>>>>>>> might
> > >>>>>>>>>>>>>>>>>>> design a
> > >>>>>>>>>>>>>>>>>>>>>> follow-up KIP that would catch all errors
> > >>>>>>>>>> (including
> > >>>>>>>>>>>> ones
> > >>>>>>>>>>>>>>>>> reported
> > >>>>>>>>>>>>>>>>>>>>>> asynchronously instead of synchronously)? I'd
> > >>>>>>>>> like
> > >>>>>>>>>> it
> > >>>>>>>>>>>> if
> > >>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>> could
> > >>>>>>>>>>>>>>>>>>> leave
> > >>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>> door open for that without painting ourselves
> > >>>>>>>>> into
> > >>>>>>>>>>> too
> > >>>>>>>>>>>>> much
> > >>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>> corner
> > >>>>>>>>>>>>>>>>>>>>>> with the API design for this KIP.
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> Chris
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> On Fri, Jun 28, 2024 at 6:31 PM Matthias J. Sax <
> > >>>>>>>>>>>>>>>>> mj...@apache.org>
> > >>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> Thanks Alieh,
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> it seems this KIP can just pick between a couple
> > >>>>>>>>>> of
> > >>>>>>>>>>>>>>> tradeoffs.
> > >>>>>>>>>>>>>>>>>>> Adding
> > >>>>>>>>>>>>>>>>>>>> an
> > >>>>>>>>>>>>>>>>>>>>>>> overloaded `send()` as the KIP propose makes
> > >>>>>>>>> sense
> > >>>>>>>>>>> to
> > >>>>>>>>>>>> me
> > >>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>> seems
> > >>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>> provides the cleanest solution compare to there
> > >>>>>>>>>>>> options
> > >>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>> discussed.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> Given the explicit name of the passed-in option
> > >>>>>>>>>> that
> > >>>>>>>>>>>>>>>> highlights
> > >>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>> option is for TX only make is pretty clear and
> > >>>>>>>>>>> avoids
> > >>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>> issue
> > >>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>> `flush()` ambiguity.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> Nit: We should make clear on the KIP though,
> > >>>>>>>>> that
> > >>>>>>>>>>> the
> > >>>>>>>>>>>>> new
> > >>>>>>>>>>>>>>>>> `send()`
> > >>>>>>>>>>>>>>>>>>>>>>> overload would throw an `IllegalStateException`
> > >>>>>>>>> if
> > >>>>>>>>>>> TX
> > >>>>>>>>>>>>> are
> > >>>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>> used
> > >>>>>>>>>>>>>>>>>>>>>>> (similar to other TX methods like initTx(), etc)
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> About the `Producer` interface, I am not sure
> > >>>>>>>>> how
> > >>>>>>>>>>> this
> > >>>>>>>>>>>>> was
> > >>>>>>>>>>>>>>>> done
> > >>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>> past (eg, KIP-266 added
> > >>>>>>>>> `Consumer.poll(Duration)`
> > >>>>>>>>>>> w/o
> > >>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>> default
> > >>>>>>>>>>>>>>>>>>>>>>> implementation), if we need a default
> > >>>>>>>>>> implementation
> > >>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>> backward
> > >>>>>>>>>>>>>>>>>>>>>>> compatibility or not? If we do want to add one,
> > >>>>>>>>> I
> > >>>>>>>>>>>> think
> > >>>>>>>>>>>>> it
> > >>>>>>>>>>>>>>>> would
> > >>>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>>>> appropriate to throw an
> > >>>>>>>>>>>> `UnsupportedOperationException`
> > >>>>>>>>>>>>> by
> > >>>>>>>>>>>>>>>>>> default,
> > >>>>>>>>>>>>>>>>>>>>>>> instead of just keeping the default impl empty?
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> My points are rather minor, and should not block
> > >>>>>>>>>>> this
> > >>>>>>>>>>>>> KIP
> > >>>>>>>>>>>>>>>>> though.
> > >>>>>>>>>>>>>>>>>>>>>>> Overall LGTM.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> -Matthias
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> On 6/27/24 1:28 PM, Alieh Saeedi wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>> Hi Justine,
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the suggestion.
> > >>>>>>>>>>>>>>>>>>>>>>>> Making applications to validate every single
> > >>>>>>>>>>> record
> > >>>>>>>>>>>> is
> > >>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>> best
> > >>>>>>>>>>>>>>>>>>>>> way,
> > >>>>>>>>>>>>>>>>>>>>>>>> from an efficiency point of view.
> > >>>>>>>>>>>>>>>>>>>>>>>> Moreover, between changing the behavior of the
> > >>>>>>>>>>>>> Producer
> > >>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>> `send`
> > >>>>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>>> `commitTnx`, the former seems more reasonable
> > >>>>>>>>>> and
> > >>>>>>>>>>>>> clean.
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> Bests,
> > >>>>>>>>>>>>>>>>>>>>>>>> Alieh
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> On Thu, Jun 27, 2024 at 8:14 PM Justine Olshan
> > >>>>>>>>>>>>>>>>>>>>>>> <jols...@confluent.io.invalid>
> > >>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> Hey Alieh,
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> I see there are two options now. So folks
> > >>>>>>>>> will
> > >>>>>>>>>> be
> > >>>>>>>>>>>>>>>> discussing
> > >>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>> approaches
> > >>>>>>>>>>>>>>>>>>>>>>>>> and deciding the best way forward before we
> > >>>>>>>>>> vote?
> > >>>>>>>>>>>>>>>>>>>>>>>>> I do think there could be a problem with the
> > >>>>>>>>>>>> approach
> > >>>>>>>>>>>>>> on
> > >>>>>>>>>>>>>>>>> commit
> > >>>>>>>>>>>>>>>>>>> if
> > >>>>>>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>>>>> get
> > >>>>>>>>>>>>>>>>>>>>>>>>> stuck on an earlier error and have more
> > >>>>>>>>> records
> > >>>>>>>>>>>>>>>> (potentially
> > >>>>>>>>>>>>>>>>> on
> > >>>>>>>>>>>>>>>>>>> new
> > >>>>>>>>>>>>>>>>>>>>>>>>> partitions) to commit as the current PR is
> > >>>>>>>>>>>>> implemented.
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> I guess this takes us back to the question of
> > >>>>>>>>>>>> whether
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>> error
> > >>>>>>>>>>>>>>>>>>>>> should
> > >>>>>>>>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>>>>>> cleared on send.
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> (And I guess at the back of my mind, I'm
> > >>>>>>>>>>> wondering
> > >>>>>>>>>>>> if
> > >>>>>>>>>>>>>>> there
> > >>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>>> way
> > >>>>>>>>>>>>>>>>>>>>>>> we can
> > >>>>>>>>>>>>>>>>>>>>>>>>> validate the "posion pill" records
> > >>>>>>>>> application
> > >>>>>>>>>>> side
> > >>>>>>>>>>>>>>> before
> > >>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>> even
> > >>>>>>>>>>>>>>>>>>>>> try
> > >>>>>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>>> send them)
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> Justine
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Jun 26, 2024 at 4:38 PM Alieh Saeedi
> > >>>>>>>>>>>>>>>>>>>>>>> <asae...@confluent.io.invalid
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> Hi Justine,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> I did not update the KIP with
> > >>>>>>>>> `TxnSendOption`
> > >>>>>>>>>>>> since
> > >>>>>>>>>>>>> I
> > >>>>>>>>>>>>>>>>> thought
> > >>>>>>>>>>>>>>>>>>> it'd
> > >>>>>>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>>>>>>> better discussed here beforehand.
> > >>>>>>>>>>>>>>>>>>>>>>>>>> right now, there are 2 PRs:
> > >>>>>>>>>>>>>>>>>>>>>>>>>> - the PR that implements the current version
> > >>>>>>>>>> of
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>> KIP:
> > >>>>>>>>>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/16332
> > >>>>>>>>>>>>>>>>>>>>>>>>>> - the POC PR that clarifies the
> > >>>>>>>>>> `TxnSendOption`:
> > >>>>>>>>>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/16465
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> Bests,
> > >>>>>>>>>>>>>>>>>>>>>>>>>> Alieh
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Jun 27, 2024 at 12:42 AM Justine
> > >>>>>>>>>> Olshan
> > >>>>>>>>>>>>>>>>>>>>>>>>>> <jols...@confluent.io.invalid> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Hey Alieh,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> I think I am a little confused. Are the 3
> > >>>>>>>>>>> points
> > >>>>>>>>>>>>>> above
> > >>>>>>>>>>>>>>>>>>> addressed
> > >>>>>>>>>>>>>>>>>>>> by
> > >>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>> KIP
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> or did something change? The PR seems to
> > >>>>>>>>> not
> > >>>>>>>>>>>>> include
> > >>>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>>>>> change
> > >>>>>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>>>>> still
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> has the CommitOption as well.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Justine
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Jun 26, 2024 at 2:15 PM Alieh
> > >>>>>>>>> Saeedi
> > >>>>>>>>>>>>>>>>>>>>>>>>>> <asae...@confluent.io.invalid
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi all,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Looking at the PR <
> > >>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/16332
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> corresponding to the KIP, there are some
> > >>>>>>>>>>> points
> > >>>>>>>>>>>>>> worthy
> > >>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>> mention:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) clearing the error ends up dirty/messy
> > >>>>>>>>>> code
> > >>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>>>>>> `TransactionManager`.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) By clearing the error, we are actually
> > >>>>>>>>>>> doing
> > >>>>>>>>>>>> an
> > >>>>>>>>>>>>>>>> illegal
> > >>>>>>>>>>>>>>>>>>>>>>> transition
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> from
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> `ABORTABLE_ERROR` to `IN_TRANSACTION`
> > >>>>>>>>> which
> > >>>>>>>>>> is
> > >>>>>>>>>>>>>>>>> conceptually
> > >>>>>>>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> acceptable.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> This can be the root cause of some issues,
> > >>>>>>>>>>> with
> > >>>>>>>>>>>>>>> perhaps
> > >>>>>>>>>>>>>>>>>>> further
> > >>>>>>>>>>>>>>>>>>>>>>>>> future
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> changes by others.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> 3) If the poison pill record `r1` causes a
> > >>>>>>>>>>>>>> transition
> > >>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>> error
> > >>>>>>>>>>>>>>>>>>>>>>>>>> state
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> and then the next record `r2` requires
> > >>>>>>>>>> adding
> > >>>>>>>>>>> a
> > >>>>>>>>>>>>>>>> partition
> > >>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> transaction, the action fails due to being
> > >>>>>>>>>> in
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>> error
> > >>>>>>>>>>>>>>>>>> state.
> > >>>>>>>>>>>>>>>>>>>> In
> > >>>>>>>>>>>>>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> case, clearing errors during
> > >>>>>>>>>>>>>>>> `commitTnx(CLEAR_SEND_ERROR)`
> > >>>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>> too
> > >>>>>>>>>>>>>>>>>>>>>>>>> late.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> However, this case can NOT be the main
> > >>>>>>>>>> concern
> > >>>>>>>>>>>> as
> > >>>>>>>>>>>>>> soon
> > >>>>>>>>>>>>>>>> as
> > >>>>>>>>>>>>>>>>>>>> KIP-890
> > >>>>>>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> fully
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> implemented.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> My suggestion is to solve the problem
> > >>>>>>>>> where
> > >>>>>>>>>> it
> > >>>>>>>>>>>>>> arises.
> > >>>>>>>>>>>>>>>> If
> > >>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>> transition
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> the error state does not happen during
> > >>>>>>>>>>> `send()`,
> > >>>>>>>>>>>>> we
> > >>>>>>>>>>>>>> do
> > >>>>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>> need
> > >>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>>>> clear
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> the error later. Therefore, instead of
> > >>>>>>>>>>>>>> `CommitOption`,
> > >>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>> can
> > >>>>>>>>>>>>>>>>>>>>> define
> > >>>>>>>>>>>>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> `TxnSendOption` and prevent the `send()`
> > >>>>>>>>>>> method
> > >>>>>>>>>>>>> from
> > >>>>>>>>>>>>>>>> going
> > >>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>> error
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> state in case 1) we're in a transaction
> > >>>>>>>>> and
> > >>>>>>>>>> 2)
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>> user
> > >>>>>>>>>>>>>>>>>> asked
> > >>>>>>>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> IGONRE_SEND_ERRORS. For more clarity, you
> > >>>>>>>>>> can
> > >>>>>>>>>>>>> take a
> > >>>>>>>>>>>>>>>> look
> > >>>>>>>>>>>>>>>>> at
> > >>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>> POC
> > >>>>>>>>>>>>>>>>>>>>>>>>> PR
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> <
> > >>>>>>>>> https://github.com/apache/kafka/pull/16465
> > >>>>>>>>>>> .
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Alieh
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> > >
> >
>

Reply via email to