Hi Greg,

I'm sorry if anything I said sounded like I'm trying to minimize the
concerns that's definitely not my intention.  My background is in
databases, and I share your concerns of muddying transaction semantics
(which is already, as you pointed out, is different in Kafka).

If calling some kind of record pre-validation method (to be called
explicitly before send) is acceptable to KStreams that would be my
preference as well, because that expresses the intent of the desired
functionality, IMO.

One suggestion I would like to make is to not name it "prepare", as there
will be a prepareTransaction method once
https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC
is implemented and could be confusing to have 2 methods named "prepare"
that mean completely different things.

Maybe name it valiateRecord?

-Artem

On Fri, Jul 19, 2024 at 4:19 PM Greg Harris <greg.har...@aiven.io.invalid>
wrote:

> Hi Artem and Matthias,
>
> > On the other hand, the effort to prove that
> > keeping all records in memory won't break some scenarios (and generally
> > breaking one is enough to cause a lot of pain) seems to be significantly
> > higher than to prove that setting some flag in some API has pretty much 0
> > chance of regression
>
> > in the end, why buffer records twice?
>
> > This way we don't
> > ignore the error, we're just changing the method they are delivered.
>
> > Very clean semantics
> > which should also address the concern of "non-atomic tx"
>
> I feel like my concerns are being minimized instead of being addressed
> in this discussion, and if that's because I'm not expressing them clearly,
> I apologize.
>
> Many users come to Kafka with prior expectations, especially when we use
> industry-standard terminology like 'Exactly Once Semantics",
> "Transactions", "Commit", "Abort". Of course Kafka isn't an ACID-compliant
> database, but users will evaluate, discuss, and develop applications with
> Kafka through the lens of the ACID principles, because that is the
> framework most commonly applied to transactional semantics.
> The original design of KIP-98 [1] explicitly mentions atomic commits (with
> the same meaning as the A in ACID) as the primary abstraction being added
> (reproduced here):
>
> > At the core, transactional guarantees enable applications to produce to
> multiple TopicPartitions atomically, ie. all writes to these
> TopicPartitions will succeed or fail as a unit.
> > Further, since consumer progress is recorded as a write to the offsets
> topic, the above capability is leveraged to enable applications to batch
> consumed and produced messages into a single atomic unit, ie. a set of
> messages may be considered consumed only if the entire
> ‘consume-transform-produce’ executed in its entirety.
>
> I think it's important to say that to a user, "writes" really means "send()
> and commitOffsets() calls", not literal produce requests to Kafka brokers,
> and "consume-transform-produce" really means "poll(), transform, send()".
> This is because to a user, the implementation within poll() and send() and
> the broker are none of their concern, and are intended to be within the
> abstraction.
> When I say that this feature is a non-atomic commit, I mean that this
> feature does not fit the above description, and breaks the transaction
> abstraction in a meaningful way. No longer do all writes succeed or fail as
> a unit, some failures are permitted to drop data. No longer must a
> consume-transform-produce cycle be executed in its entirety, some parts may
> be left incomplete.
> This means that this method will be difficult to define ("which exceptions
> are covered?"), difficult to document ("how do we explain
> 'not-really-atomic commits' clearly and unambiguously to a potential
> user?"), and difficult to compose ("if someone turns this option on, how
> does that affect delivery guarantees and opportunities for bugs in
> upper layers?").
> Users currently rely heavily on analogies to other database systems to make
> sense of Kafka's transactions, and we need to use that to our benefit,
> rather than designing in spite of it being true.
>
> However this atomicity guarantee isn't always desirable, as evidenced by
> the original bug report [2]. If you're interacting with a website form for
> example, and a database transaction fails because one of your strings is
> oversize, you don't need to re-input all of your form responses from
> scratch, as there is an application layer/browser in-between to preserve
> the state and retry the transaction.
> And while you could make a convenience/performance/etc argument in that
> situation ("The database should truncate/null-out the oversize string") and
> modern databases often have very expressive DML that would permit such a
> behavior (try-catch, etc), the End-to-End arguments [3] make me believe
> that is a bad design and should be discouraged.
> To that end, I was suggesting ways to push this farther and farther up the
> stack, such as performing record size estimation. This doesn't mean that it
> can't be added at a low level of abstraction, just that we need to make
> sure to exhaust all other alternatives, and justify it with a performance
> benefit.
>
> I was holding off on discussing the literal design until you provided
> concrete performance justification, but to progress the discussion while
> i'm waiting for that, I can give my thoughts:
>
> I don't think an overloaded send() method is appropriate given that this
> appears to be a niche use-case, and the send() method itself is probably
> the single most important method in the Clients library. The KIP-98 design
> was a much more substantial change to the Producer than this KIP, and it
> found a way to preserve the original type signature (but added an
> exception).
> Users picking up the Producer for the first time may see this additional
> method, and may spend time trying to understand whether it is something
> suitable for their use-case. In the best case, they ignore it and use the
> other two signatures. But it is also possible that they will use it without
> understanding it, and have unexpected data loss. Overall, this feels like a
> net negative to the producer user-base.
> Also boolean, enum, vararg, flags, etc don't follow the current trend of
> the AdminClient passing Options DTOs for modifying individual API calls,
> which is a much more extensible design.
>
> If there was a way of preparing a record before calling send() that did
> some or all of the pre-send validation (serialization, size checking, maybe
> topic-partition existence existence, authorization, etc) that could be a
> reasonable way to emit these sorts of errors in a way that makes it clear
> that sending hasn't started and the record is not part of the transaction
> yet. I'm imagining something like:
>
> ```
> public abstract class PreparedRecord<K, V> extends ProducerRecord<K, V> { }
> // Actual instantiated class and methods could be an implementation detail.
> interface Producer {
>     Optional<PreparedRecord<K, V>> prepare(ProducerRecord<K, V> record,
> PrepareOptions options) throws SerializationException,
> RecordTooLargeException;
> }
>
> // Application code:
> Optional<PreparedRecord<byte[], byte[]>> prepared = Optional.empty();
> try {
>      prepared = producer.prepare(record, null);
> } catch (Exception e) {
>      if (critical(e)) {
>         producer.abortTransaction();
>         return;
>     }
> }
> if (prepared.isPresent()) {
>     producer.send(prepared.get()); // errors will be propagated via
> commitTransaction() and also abort the transaction.
> }
> ```
>
> Semantically then, it would make sense that the data which has been
> "prepared to send" but has not been "sent" is intentionally left out of the
> batch by the application control-flow, not an option passed into the
> producer changing the control-flow within the Producer. If passed a
> prepared record, the send method can then be responsible only for waiting
> for sufficient buffer capacity, reusing the work done by the prepare()
> method.
> This prepare method could also enable other use-cases, like parallel
> serialization, exception-less handling, or more advanced buffering/balking
> behavior ("refuse to prepare records that can't be sent without blocking").
> And because the synchronous processing is made explicit, it's much easier
> to communicate to users which exceptions are covered and can be prevented
> from causing transaction aborts.
> If someone uses the method, or doesn't use it, they aren't put at risk of
> compromising their delivery guarantees unexpectedly.
>
> Thanks,
> Greg
>
> [1]
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> [2] https://issues.apache.org/jira/browse/KAFKA-15259
> [3] https://web.mit.edu/Saltzer/www/publications/endtoend/endtoend.pdf
>
> On Fri, Jul 19, 2024 at 12:39 PM Matthias J. Sax <mj...@apache.org> wrote:
>
> > For catching client side errors this would work IMHO. I am ok with this.
> >
> > We throw before we add the record to the batch. Very clean semantics
> > which should also address the concern of "non-atomic tx"... The
> > exception clearly indicates that the record was not added to the TX, and
> > users can react to it accordingly.
> >
> > We did discuss this idea previously, but did not have a good proposal to
> > make it backward compatible. The newly proposed overload would address
> > this issue of backward compatibility.
> >
> > Of course, it might not make it easily extensible in the future for
> > broker side errors, but it's unclear anyway right now, if we would even
> > get to a solution for broker side errors or not -- so maybe it's ok to
> > accept this and drop/ignore the broker side error question for now.
> >
> >
> >
> > A small follow up thought/question: instead of using a boolean, would we
> > actually want to make it a var-arg enum to allow users to enable this
> > for certain errors explicitly and individually? Beside the added
> > flexibility and fine grain control, a var-arg enum would also make the
> > API nicer/cleaner IMHO compare to a boolean.
> >
> > For convenience, this enum could have an additional `ALL` option (and we
> > would call out that if `ALL` is used, new error types might be added in
> > future release making the code less safe/robust -- ie, use at your own
> > risk only)
> >
> > This way, we also explicitly document what exception might be thrown in
> > the KIP, as we would add an enum for each error type explicitly, and
> > also make if future proof for new error types we want to cover -- each
> > addition would require a KIP to extend the enum.
> >
> >
> >
> > -Matthias
> >
> >
> > On 7/18/24 10:33 PM, Artem Livshits wrote:
> > > Hey folks,
> > >
> > > Hopefully not to make this KIP go for another spin :-), but I thought
> of
> > a
> > > modification that might actually address safety concerns over using
> flags
> > > to ignore a vaguely specified class of errors.
> > >
> > > What if we had the following overload of .send method:
> > >
> > >    void send(ProducerRecord record, Callback callback, boolean
> > > throwImmediately)
> > >
> > > and if throwImmediately=false, then we behave the same way as now
> (return
> > > errors via Future and poison transaction) and if throwImmediately=true
> > then
> > > we just throw errors immediately from the send function.  This way we
> > don't
> > > ignore the error, we're just changing the method they are delivered.
> > Then
> > > KStreams can catch the error for send(record, callback, true) and do
> > > whatever it needs to do.
> > >
> > > -Artem
> > >
> > >
> > > On Mon, Jul 15, 2024 at 4:30 PM Greg Harris
> <greg.har...@aiven.io.invalid
> > >
> > > wrote:
> > >
> > >> Matthias,
> > >>
> > >> Thank you for rejecting my suggested alternatives. Your responses are
> > the
> > >> sorts of things I expected to see summarized in the text of the KIP.
> > >>
> > >> I agree with most of your rejections, except this one:
> > >>
> > >>> "Estimation" is not sufficient, but we would need to know it exactly.
> > >>> And that's an impl detail, given that the message format could change
> > >>> and we could add new internal fields increasing the message size.
> > >>
> > >> An estimate is certainly going to have an error. But an estimate
> > shouldn't
> > >> be treated as exact anyway, there should be an error bound, or "safety
> > >> factor" used when interpreting it. For example, if the broker side
> > limit is
> > >> 1MB, and an estimate could be wrong by ~10%, then computing an
> estimate
> > and
> > >> dropping records >900kb should be sufficient to prevent RTLEs.
> > >> This is the sort of estimation that I would expect application
> > developers
> > >> could implement, without knowing the exact serialization and protocol
> > >> overhead. This could prevent user-originated oversize records from
> > making
> > >> it to the producer.
> > >>
> > >> Thanks,
> > >> Greg
> > >>
> > >>
> > >> On Mon, Jul 15, 2024 at 4:08 PM Matthias J. Sax <mj...@apache.org>
> > wrote:
> > >>
> > >>> I agree with Alieh and Artem -- in the end, why buffer records twice?
> > We
> > >>> effectively want to allow to push some error handling (which I btw
> > >>> consider "business logic") into the producer. IMHO, there is nothing
> > >>> wrong with it. Dropping a poison pill record is no really a violation
> > of
> > >>> atomicity from my POV, but a business logic decision to not include a
> > >>> record in a transaction -- the proposed API just makes it much
> simpler
> > >>> to achieve this business logic goal.
> > >>>
> > >>>
> > >>>
> > >>> For memory size estimation, throughput or message size is actually
> not
> > >>> relevant, right? We would need to look at producer buffer size, ie,
> > >>> `batch.size`, `max.in.flight.request.per.connection` and guesstimate
> > the
> > >>> number of connections there might be? At least for KS, we don't need
> to
> > >>> buffer everything until commit, but only until we get a successful
> > "ack"
> > >>> back.
> > >>>
> > >>> Note that KS application not only need to write to (a single) user
> > >>> result topic, but multiple output topics, as well as repartition and
> > >>> changelog topics, across all tasks assigned to a thread (ie,
> producer),
> > >>> which can easily be 10 tasks or more. If we assume topics with 30
> > >>> partitions (topics with 50 or more partitions are not uncommon
> either),
> > >>> and a producer who must write to 10 different topics, the number of
> > >>> required connections is very quickly very high, and thus the required
> > >>> "application buffer space" would be significant.
> > >>>
> > >>>
> > >>>
> > >>> Your others ideas seems not to be viable alternatives:
> > >>>
> > >>>> Streams users that specifically want to drop oversize records can
> > >>>> estimate the size of their data and drop records which are too
> > >>>> large, enforcing their own limits that are lower than the Kafka
> > limits.
> > >>>
> > >>> "Estimation" is not sufficient, but we would need to know it exactly.
> > >>> And that's an impl detail, given that the message format could change
> > >>> and we could add new internal fields increasing the message size. The
> > >>> idea to add some `producer.serializedRecordSize()` helper method was
> > >>> discussed, but it's a very ugly API and clumsy to use -- also, the
> user
> > >>> code would need to know the producer config which it might not have
> > >>> access to (as it might get passed in from some config file; and it
> > might
> > >>> also be changed).
> > >>>
> > >>> Some other alternative we also discussed was, to let `send()` throw
> an
> > >>> exception for a "record too large" case directly. However, this
> > solution
> > >>> raises backward compatibly concerns, and it might also not help us to
> > >>> extend the solution in the future (eg, tackle broker side errors). So
> > we
> > >>> discarded this idea.
> > >>>
> > >>>
> > >>>
> > >>>> Streams users that want CONTINUE semantics can use at_least_once
> > >>>> semantics
> > >>>
> > >>> Not really. EOS is mainly about not having duplicates in the result,
> > but
> > >>> at-least-once cannot provide this guarantee. (Even if I repeat my
> self:
> > >>> but dropping a poison pill record based on a business logic decision
> is
> > >>> not data loss, but effectively a business logic filter...)
> > >>>
> > >>>
> > >>>
> > >>>> Streams itself can store record hashes/coordinates and fast rewind
> to
> > >>>> the end of the last transaction, recomputing data rather than
> storing
> > >> it.
> > >>>
> > >>> Given the very complex nature of topologies, with joins,
> aggregations,
> > >>> flatmaps etc, this is a 100x more complex solution and not viable in
> > >>> practice.
> > >>>
> > >>>
> > >>>
> > >>>> Streams can define exactly_once + CONTINUE semantics to permit the
> > >> whole
> > >>>> transaction to be dropped, because it would allow the next batch to
> be
> > >>>> started processing.
> > >>>
> > >>> Would this not be much worse? I have a single poison pill record and
> > >>> would need to drop a full tx (this could be tens of thousands of
> > >>> records...). Also, given that KS write into changelog topic in the
> same
> > >>> TX, this could break the whole application.
> > >>>
> > >>>
> > >>>
> > >>>> Streams can emit records with both a transactional and
> > >> non-transactional
> > >>>> producer if some records are not critical-path
> > >>>
> > >>> We (1) already have a "too many connections" problem with KS so using
> > >>> move clients is something we try to avoid (and we actually hope to
> > >>> reduce the number of client and connection mid to long term), (2)
> this
> > >>> would be very hard to express at the API level to the user, and (3)
> it
> > >>> would provide very weird semantics.
> > >>>
> > >>>
> > >>>
> > >>>> they should optimize for smaller transactions,
> > >>>
> > >>> IMHO, this would not work in practice because transaction have a high
> > >>> overhead and commit-interval is used to tradeoff throughput vs
> > >>> end-to-end latency. Given certain throughput requirement, it would
> not
> > >>> be possible to just use a lower commit interval to reduce memory
> > >>> requirements.
> > >>>
> > >>>
> > >>>
> > >>> -Matthias
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> On 7/15/24 2:25 PM, Artem Livshits wrote:
> > >>>> Hi Greg,
> > >>>>
> > >>>>> This makes me think that this IGNORE_SEND_ERRORS covers an
> arbitrary
> > >> set
> > >>>> of error conditions that may be expanded in the future, possibly to
> > >> cover
> > >>>> the broker side RecordTooLargeException.
> > >>>>
> > >>>> I don't think it contradicts what I said (the keyword here is "in
> the
> > >>>> future") -- with the current functionality, the correct way to
> handle
> > >>> RTLE
> > >>>> is by only letting the client ignore client-originated RTLE (this
> can
> > >> be
> > >>>> easily implemented on the client side).  In the future, we can
> improve
> > >> on
> > >>>> that by making the broker return a different exception for
> > >>> batch-too-large
> > >>>> case, then the producer would be able to return broker side
> exceptions
> > >> as
> > >>>> well (and if the application chooses to ignore it -- it will be able
> > >> to,
> > >>>> but it would be an explicit choice rather than ignoring it by
> > mistake),
> > >>> in
> > >>>> this case the producer client would encapsulate backward
> compatibility
> > >>>> logic when it connects to older brokers to make sure the the
> > >> application
> > >>>> doesn't accidentally gets RTLE originated by the old broker.  This
> > >>>> functionality is obviously more involved and we'll need to see if
> > going
> > >>> all
> > >>>> the way is justified, but the partial client-only solution doesn't
> > >> close
> > >>>> the door.
> > >>>>
> > >>>> So one way to look at the current situation is the following:
> > >>>>
> > >>>> 1. We can do a low effort partial solution to solve a real existing
> > >>>> problem.  We can easily prove that it would do exactly what it needs
> > to
> > >>> do
> > >>>> with minimal risk of regression.
> > >>>> 2. We have a path to a more comprehensive solution, so if we justify
> > >> the
> > >>>> effort required for that, we can get there.
> > >>>>
> > >>>> BTW, as a side note (I think a saw a question in the thread), we do
> > try
> > >>> to
> > >>>> introduce error categories here
> > >>>>
> > >>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1050%3A+Consistent+error+handling+for+Transactions
> > >>>> so eventually we may have a better classification for the errors.
> > >>>>
> > >>>>> "if a streams producer is producing 1MB/s, and the commit interval
> is
> > >> 1
> > >>>> hour, I expect 3600MB of additional heap needed ...
> > >>>>
> > >>>> Agree, that would be ideal.  On the other hand, the effort to prove
> > >> that
> > >>>> keeping all records in memory won't break some scenarios (and
> > generally
> > >>>> breaking one is enough to cause a lot of pain) seems to be
> > >> significantly
> > >>>> higher than to prove that setting some flag in some API has pretty
> > >> much 0
> > >>>> chance of regression (we basically have a flag to say "unfix
> > >> KAFKA-9279"
> > >>> so
> > >>>> we're getting to fairly "known good" state).  I'll let KStream folks
> > >>>> comment on this one (and we still need to solve the problem of
> > >> accidental
> > >>>> handling of RTLE originated from broker, so some KIP would be
> required
> > >> to
> > >>>> somehow help to differentiate those).
> > >>>>
> > >>>> -Artem
> > >>>>
> > >>>> On Mon, Jul 15, 2024 at 1:31 PM Greg Harris
> > >> <greg.har...@aiven.io.invalid
> > >>>>
> > >>>> wrote:
> > >>>>
> > >>>>> Hi Artem,
> > >>>>>
> > >>>>> Thank you for clarifying as I'm joining the conversation late and
> may
> > >>> have
> > >>>>> some misconceptions.
> > >>>>>
> > >>>>>> Because of this, a more "complete" solution that
> > >>>>>> allows ignoring RecordTooLargeException regardless of its origin
> is
> > >>>>>> actually incorrect, while a "partial" solution that allows
> ignoring
> > >>>>>> RecordTooLargeException only originating in client code
> accomplishes
> > >>> the
> > >>>>>> required functionality.
> > >>>>>
> > >>>>> This is not how I understood this feature. Above Matthias said the
> > >>>>> following:
> > >>>>>
> > >>>>>> We can do
> > >>>>>> follow up KIP for other errors on an on-demand basis and
> fix-forward
> > >> /
> > >>>>>> enlarge the scope successively.
> > >>>>>
> > >>>>> This makes me think that this IGNORE_SEND_ERRORS covers an
> arbitrary
> > >>> set of
> > >>>>> error conditions that may be expanded in the future, possibly to
> > cover
> > >>> the
> > >>>>> broker side RecordTooLargeException.
> > >>>>>
> > >>>>>> Obviously, we could solve this problem by changing logic in the
> > >>>>>> broker to return a different error when the batch is too large,
> but
> > >>> right
> > >>>>>> now this is not the case
> > >>>>>
> > >>>>> If the broker/wire protocol isn't ready for these errors to be
> > >>> propagated,
> > >>>>> then I don't think we're ready to add this API. It's going to be
> > >>>>> under-generalized, and there's a decent chance that we're going to
> > >>> regret
> > >>>>> the design choices in the future. And users that expect it to be
> > fully
> > >>>>> generalized are going to be disappointed when they don't read the
> > fine
> > >>>>> print and still get faulted by non-covered errors.
> > >>>>>
> > >>>>>> AL2.  In a high performance system, "just an optimization" can be
> a
> > >>>>>> functional requirement ...
> > >>>>>>    I just wanted to make the point that we shouldn't necessarily
> > >> dismiss
> > >>>>>> API changes that allow for optimizations.
> > >>>>>
> > >>>>> My earlier statement didn't dismiss this feature as "just an
> > >>> optimization",
> > >>>>> actually the opposite. I said that performance could be a
> > >> justification,
> > >>>>> but only if it is quantified and stated explicitly. We shouldn't be
> > >>> voting
> > >>>>> on hand-wavy optimizations, we should be voting on things that are
> > >>>>> quantifiable.
> > >>>>> For example an analysis like the following would facilitate further
> > >>>>> discussion: "if a streams producer is producing 1MB/s, and the
> commit
> > >>>>> interval is 1 hour, I expect 3600MB of additional heap needed per
> > >>>>> producer". We can then discuss whether we expect higher or lower
> > >>>>> throughput, commit intervals, or heap usage to determine what the
> > >>> operating
> > >>>>> envelope of this feature could be.
> > >>>>> If there are a substantial number of users that have high
> throughput,
> > >>> long
> > >>>>> commit intervals, _and_ RTLEs, then this feature could make sense.
> If
> > >>> not,
> > >>>>> then the downsides of this feature (complication of the API,
> > >>>>> under-specification of the error coverage, etc) look unjustified.
> In
> > >>> fact,
> > >>>>> if the number of users regularly encountering RTLEs is sufficiently
> > >>> small,
> > >>>>> I would strongly advocate for an application-specific workaround
> > >>> instead of
> > >>>>> trying to fix this in Streams, or make memory buffering an optional
> > >>> feature
> > >>>>> in streams.
> > >>>>>
> > >>>>> Thanks,
> > >>>>> Greg
> > >>>>>
> > >>>>> On Mon, Jul 15, 2024 at 1:29 PM Greg Harris <greg.har...@aiven.io>
> > >>> wrote:
> > >>>>>
> > >>>>>> Hi Alieh,
> > >>>>>>
> > >>>>>> Thanks for your response.
> > >>>>>>
> > >>>>>>> what does a user do
> > >>>>>>> after a transaction is failed due to a `too-large-record
> > `exception?
> > >>>>> They
> > >>>>>>> will submit the same batch without the problematic record again.
> > >>>>>>
> > >>>>>> If they re-submit the same record, they are indicating that this
> > >> record
> > >>>>> is
> > >>>>>> an integral part of the transaction, and the transaction should
> only
> > >> be
> > >>>>>> committed with it present. If the record isn't integral to the
> > >>>>> transaction,
> > >>>>>> they shouldn't submit it as part of the transaction.
> > >>>>>>
> > >>>>>>> Regarding your solution to solve the issue application-side:  I
> am
> > a
> > >>>>>>> bit hesitant to keep all sent records in memory since I think
> > >>> buffering
> > >>>>>>> records twice (both in Streams and Producer) would not be an
> > >> efficient
> > >>>>>>> solution.
> > >>>>>>
> > >>>>>> I understand your hesitation, and this touches on the
> "performance"
> > >>>>> caveat
> > >>>>>> of the end-to-end arguments in system design. There are no perfect
> > >>>>> designs,
> > >>>>>> and some API cleanliness may be sacrificed in favor of more
> > >> performant
> > >>>>>> solutions. You would need to make a concrete and convincing
> argument
> > >>> that
> > >>>>>> the performance of this solution would be better than every
> > >>> alternative.
> > >>>>> To
> > >>>>>> that end, I would recommend that you add more to the "Rejected
> > >>>>>> Alternatives" section, as that is going to carry this proposal.
> > >>>>>> Some alternatives that I can think of, but which aren't
> necessarily
> > >>>>> better:
> > >>>>>> 1. Streams users that specifically want to drop oversize records
> can
> > >>>>>> estimate the size of their data and drop records which are too
> > >>>>>> large, enforcing their own limits that are lower than the Kafka
> > >> limits.
> > >>>>>> 2. Streams users that want CONTINUE semantics can use
> at_least_once
> > >>>>>> semantics
> > >>>>>> 3. Streams itself can store record hashes/coordinates and fast
> > rewind
> > >>> to
> > >>>>>> the end of the last transaction, recomputing data rather than
> > storing
> > >>> it.
> > >>>>>> 4. Streams can define exactly_once + CONTINUE semantics to permit
> > the
> > >>>>>> whole transaction to be dropped, because it would allow the next
> > >> batch
> > >>> to
> > >>>>>> be started processing.
> > >>>>>> 5. Streams can emit records with both a transactional and
> > >>>>>> non-transactional producer if some records are not critical-path
> > >>>>>>
> > >>>>>> To generalize this point: Suppose an application tries to minimize
> > >>>>> storage
> > >>>>>> costs by having only one party responsible for a piece of data at
> a
> > >>> time.
> > >>>>>> They initially have the data, call send(), and want to know the
> > >>> earliest
> > >>>>>> time they can forget the data and transfer the responsibility to
> > >> Kafka.
> > >>>>>> With a non-transactional producer, they are responsible for the
> data
> > >>>>> until
> > >>>>>> the send() callback has succeeded. With a transactional producer,
> > >> they
> > >>>>> are
> > >>>>>> responsible for the data until commitTransaction() has succeeded.
> > >>>>>> With this proposed change that makes the producer tolerate
> > >>>>>> too-large-exceptions, applications are still responsible for
> storing
> > >>>>> their
> > >>>>>> data until commitTransaction() has succeeded, because
> > >>> abortTransaction()
> > >>>>>> could have also been called, or the producer could have been
> fenced,
> > >> or
> > >>>>> any
> > >>>>>> number of other failures could have occurred. This feature does
> not
> > >>>>> enable
> > >>>>>> Streams to drop responsibility earlier, it carves out a specific
> > >>>>> situation
> > >>>>>> in which it doesn't have to rewind processing, which is a
> > performance
> > >>>>>> concern.
> > >>>>>>
> > >>>>>> For Streams and the general case, if an application is trying to
> > >>> optimize
> > >>>>>> storage costs, they should optimize for smaller transactions,
> > because
> > >>>>> this
> > >>>>>> both lowers the bound on record re-delivery and lowers the
> > likelihood
> > >>> of
> > >>>>> a
> > >>>>>> bad record being included in any individual transaction.
> > >>>>>>
> > >>>>>> Thanks,
> > >>>>>> Greg
> > >>>>>>
> > >>>>>> On Mon, Jul 15, 2024 at 12:35 PM Artem Livshits
> > >>>>>> <alivsh...@confluent.io.invalid> wrote:
> > >>>>>>
> > >>>>>>> Hi Greg,
> > >>>>>>>
> > >>>>>>> What you say makes a lot of sense.  I just wanted to clarify a
> > >> couple
> > >>> of
> > >>>>>>> subtle points.
> > >>>>>>>
> > >>>>>>> AL1. There is a functional reason to handle errors that happen on
> > >> send
> > >>>>>>> (oginate in the producer logic in the client) vs. errors that are
> > >>>>> returned
> > >>>>>>> from the broker.  The problem is that RecordTooLargeException is
> > >>>>> returned
> > >>>>>>> in two cases: (1) the producer logic on the client checks that
> > >> record
> > >>> is
> > >>>>>>> too large and throws the exception before doing anything with
> this
> > >> --
> > >>>>> this
> > >>>>>>> is very "clean" situation with one specific record being marked
> as
> > >>>>> "poison
> > >>>>>>> pill" and rejected; (2) the broker throws the same error if the
> > >> batch
> > >>> is
> > >>>>>>> too large -- the batch may include multiple records and none of
> > them
> > >>>>> would
> > >>>>>>> necessarily be a "poison pill" record, it's just a random
> > >>>>> misconfiguration
> > >>>>>>> of client vs. broker.  Because of this, a more "complete"
> solution
> > >>> that
> > >>>>>>> allows ignoring RecordTooLargeException regardless of its origin
> is
> > >>>>>>> actually incorrect, while a "partial" solution that allows
> ignoring
> > >>>>>>> RecordTooLargeException only originating in client code
> > accomplishes
> > >>> the
> > >>>>>>> required functionality.  This is an important nuance and should
> be
> > >>> added
> > >>>>>>> to
> > >>>>>>> the KIP.  Obviously, we could solve this problem by changing
> logic
> > >> in
> > >>>>> the
> > >>>>>>> broker to return a different error when the batch is too large,
> but
> > >>>>> right
> > >>>>>>> now this is not the case (and to have the correct error handling
> > >> we'd
> > >>>>> need
> > >>>>>>> to know the version of the broker so we can only drop the records
> > if
> > >>> the
> > >>>>>>> error is returned from a broker that knows to return a different
> > >>> error).
> > >>>>>>>
> > >>>>>>> AL2.  In a high performance system, "just an optimization" can
> be a
> > >>>>>>> functional requirement -- if a solution impacts memory or
> > >>> computational
> > >>>>>>> complexity (in the sense of bigO notation) on the main code path
> I
> > >> can
> > >>>>>>> justify changing APIs to avoid such an impact.  I'll let KStream
> > >> folks
> > >>>>>>> comment on whether an implementation that requires storing
> records
> > >> in
> > >>>>>>> memory actually violates the computational complexity on the main
> > >> code
> > >>>>>>> path, I just wanted to make the point that we shouldn't
> necessarily
> > >>>>>>> dismiss
> > >>>>>>> API changes that allow for optimizations.
> > >>>>>>>
> > >>>>>>> -Artem
> > >>>>>>>
> > >>>>>>> On Fri, Jul 12, 2024 at 1:07 PM Greg Harris
> > >>>>> <greg.har...@aiven.io.invalid
> > >>>>>>>>
> > >>>>>>> wrote:
> > >>>>>>>
> > >>>>>>>> Hi all,
> > >>>>>>>>
> > >>>>>>>> Alieh, thanks for the KIP! And everyone else, thanks for the
> > robust
> > >>>>>>>> discussion.
> > >>>>>>>>
> > >>>>>>>> I understand that there are situations in which users desire
> that
> > >> the
> > >>>>>>>> pipeline "just keep working" and skip errors. However, I
> question
> > >>>>>>> whether
> > >>>>>>>> it is appropriate to support/encourage this behavior via
> inclusion
> > >> in
> > >>>>>>> the
> > >>>>>>>> Producer API.
> > >>>>>>>> This feature is essentially a "non-atomic transaction", as it
> > >> allows
> > >>>>>>>> commits in which not all records passed to send() ultimately get
> > >>>>>>> committed.
> > >>>>>>>> As atomicity is one of the most important semantics associated
> > with
> > >>>>>>>> transactions, I question whether there are users other than
> > Streams
> > >>>>> that
> > >>>>>>>> would choose non-atomic transactions over a
> traditional/idempotent
> > >>>>>>>> producer.
> > >>>>>>>> Some cursory research shows that non-atomic transactions may be
> > >>>>> present
> > >>>>>>> in
> > >>>>>>>> other databases, but is actively discouraged due to the
> complexity
> > >>>>> they
> > >>>>>>> add
> > >>>>>>>> to error-handling. [1]
> > >>>>>>>>
> > >>>>>>>> I'd like to invoke the End-to-End Arguments in System Design [2]
> > >>> here,
> > >>>>>>> and
> > >>>>>>>> recommend that this behavior may be present in Streams, but
> should
> > >>> not
> > >>>>>>> be
> > >>>>>>>> in the Producer.
> > >>>>>>>> 1. Dropping records that cause errors is already expressible via
> > >> the
> > >>>>>>>> current Producer API. You can store the records in-memory after
> > >>>>> calling
> > >>>>>>>> send(), wait for a successful no-error flush() before calling
> > >>>>>>>> commitTransaction() and allowing the record to be garbage
> > >> collected.
> > >>>>> If
> > >>>>>>>> errors occur, abortTransaction() and re-submit the records.
> > >>>>>>>> 2. Implementing this inside the Producer API is complex and
> > >> difficult
> > >>>>> to
> > >>>>>>>> holistically define in a way that we won't regret or need to
> > change
> > >>>>>>> later.
> > >>>>>>>> I think some of the disagreement in this thread originates from
> > >> this,
> > >>>>>>> and I
> > >>>>>>>> don't find the proposed API satisfactory.
> > >>>>>>>> 3. The performance improvement of including this change in the
> > >> lower
> > >>>>>>> level
> > >>>>>>>> needs to be quantified in order to be a justification, and I
> don't
> > >>> see
> > >>>>>>> any
> > >>>>>>>> analysis about this.
> > >>>>>>>>
> > >>>>>>>> I imagine that the alternative implementation I suggested in (1)
> > >>> would
> > >>>>>>> also
> > >>>>>>>> enable more expressive error handlers in Streams, if such a
> thing
> > >> was
> > >>>>>>>> desired. Keeping the record around until after the transaction
> is
> > >>>>>>> committed
> > >>>>>>>> would enable a DLQ or passing the erroneous record to the error
> > >>>>> handler.
> > >>>>>>>>
> > >>>>>>>> I think that the current pattern of the application being
> > >> responsible
> > >>>>>>> for
> > >>>>>>>> providing good data to the producer is very reasonable; Having
> the
> > >>>>>>> producer
> > >>>>>>>> responsible for implementing the application's error handling of
> > >> bad
> > >>>>>>> data
> > >>>>>>>> is not something I can support.
> > >>>>>>>>
> > >>>>>>>> Thanks,
> > >>>>>>>> Greg
> > >>>>>>>>
> > >>>>>>>> [1] https://www.sommarskog.se/error_handling/Part1.html
> > >>>>>>>> [2]
> > >>>>> https://web.mit.edu/Saltzer/www/publications/endtoend/endtoend.pdf
> > >>>>>>>>
> > >>>>>>>> On Fri, Jul 12, 2024 at 8:52 AM Justine Olshan
> > >>>>>>>> <jols...@confluent.io.invalid>
> > >>>>>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> Can we update the KIP to clearly document these decisions?
> > >>>>>>>>>
> > >>>>>>>>> Thanks,
> > >>>>>>>>>
> > >>>>>>>>> Justine
> > >>>>>>>>>
> > >>>>>>>>> On Tue, Jul 9, 2024 at 9:25 AM Andrew Schofield <
> > >>>>>>>> andrew_schofi...@live.com
> > >>>>>>>>>>
> > >>>>>>>>> wrote:
> > >>>>>>>>>
> > >>>>>>>>>> Hi Chris,
> > >>>>>>>>>> As it stands, the error handling for transactions in
> > >> KafkaProducer
> > >>>>>>> is
> > >>>>>>>> not
> > >>>>>>>>>> ideal. There’s no reason why a failed operation should fail a
> > >>>>>>>> transaction
> > >>>>>>>>>> provided that the application can tell that the operation was
> > not
> > >>>>>>>>> included
> > >>>>>>>>>> in the transaction and then make its own decision whether to
> > >>>>>>> continue
> > >>>>>>>> or
> > >>>>>>>>>> back out. So, I think I disagree with the original premise of
> a
> > >>>>>>>>> client-side
> > >>>>>>>>>> error state for a transaction, but we are where we are.
> > >>>>>>>>>>
> > >>>>>>>>>> When I voted, I did not expect the KIP to handle ALL errors
> > which
> > >>>>>>> could
> > >>>>>>>>>> conceivably be handled. I did expect it to handle client-side
> > >> send
> > >>>>>>>> errors
> > >>>>>>>>>> that would cause a record to be rejected from a batch before
> > >>>>> sending
> > >>>>>>>> to a
> > >>>>>>>>>> broker. I think that it does make the KafkaProducer interface
> > >> very
> > >>>>>>>>> slightly
> > >>>>>>>>>> more complicated, but the new option is a clear improvement
> and
> > I
> > >>>>>>>>>> don’t see anyone getting into a mess using it.
> > >>>>>>>>>>
> > >>>>>>>>>> I think broker-side errors are more tricky and I don’t think
> an
> > >>>>>>>> overload
> > >>>>>>>>>> on the send() method is going to do the job. I don’t see that
> as
> > >> a
> > >>>>>>>>> problem
> > >>>>>>>>>> with the KIP, just that the underlying RPCs and behaviour is
> not
> > >>>>>>> very
> > >>>>>>>>>> amenable to record-specific error handling. The Produce RPC
> is a
> > >>>>>>>>>> complicated beast which can include a set of records for
> mutiple
> > >>>>>>>>>> topic-partitions. Although ProduceResponse v10 does include
> > >> record
> > >>>>>>>>>> errors, I don’t believe this is surfaced in the client. Let’s
> > >>>>>>> imagine
> > >>>>>>>>>> something
> > >>>>>>>>>> like broker-side record validation which barfs on one record.
> > >>>>>>> Failing
> > >>>>>>>> an
> > >>>>>>>>>> entire batch is easier, but less useful if the problem is
> > related
> > >>>>> to
> > >>>>>>>> one
> > >>>>>>>>>> record.
> > >>>>>>>>>>
> > >>>>>>>>>> In summary, I’m happy that my vote stands, and I am happy with
> > >> the
> > >>>>>>> KIP
> > >>>>>>>>>> only supporting client-side record errors.
> > >>>>>>>>>>
> > >>>>>>>>>> Thanks,
> > >>>>>>>>>> Andrew
> > >>>>>>>>>>
> > >>>>>>>>>>> On 8 Jul 2024, at 16:37, Chris Egerton
> <chr...@aiven.io.INVALID
> > >>>>>>
> > >>>>>>>>> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>> Hi Alieh,
> > >>>>>>>>>>>
> > >>>>>>>>>>> Can you clarify why broker-side errors shouldn't be covered?
> > The
> > >>>>>>> only
> > >>>>>>>>>> real
> > >>>>>>>>>>> rationale I can come up with is that it's easier to
> implement.
> > >>>>>>>>>>>
> > >>>>>>>>>>> "Things were better for Kafka Streams before KAFKA-9279 was
> > >>>>> fixed"
> > >>>>>>>>> isn't
> > >>>>>>>>>>> very convincing, because Kafka Streams is not the only user
> of
> > >>>>> the
> > >>>>>>>> Java
> > >>>>>>>>>>> producer client. And for others, especially new users, I
> doubt
> > >>>>>>> that
> > >>>>>>>>> this
> > >>>>>>>>>>> new API we're proposing would make sense without having to
> > >>>>>>> consult a
> > >>>>>>>>> lot
> > >>>>>>>>>> of
> > >>>>>>>>>>> historical context.
> > >>>>>>>>>>>
> > >>>>>>>>>>> I also don't think that most users will know or even care
> about
> > >>>>>>> the
> > >>>>>>>>>>> distinction between errors that cause a record to fail before
> > >>>>> it's
> > >>>>>>>>> added
> > >>>>>>>>>> to
> > >>>>>>>>>>> a batch vs. after. If you were writing a producer application
> > of
> > >>>>>>> your
> > >>>>>>>>>> own,
> > >>>>>>>>>>> and you wanted to handle RecordTooLargeException instances by
> > >>>>>>>> dropping
> > >>>>>>>>> a
> > >>>>>>>>>>> record without aborting a transaction, would you care about
> > >>>>>>> whether
> > >>>>>>>> it
> > >>>>>>>>>> was
> > >>>>>>>>>>> your client or your broker that balked? Would you be happy if
> > >>>>> you
> > >>>>>>>> wrote
> > >>>>>>>>>>> logic expecting that that problem was solved once and for
> all,
> > >>>>>>> only
> > >>>>>>>> to
> > >>>>>>>>>>> learn that it could still affect you in other circumstances?
> > Or,
> > >>>>>>>>>>> alternatively, would you be happy if you wanted to solve that
> > >>>>>>> problem
> > >>>>>>>>> and
> > >>>>>>>>>>> found an API that seemed to do exactly what you wanted, but
> > >>>>> after
> > >>>>>>>>> reading
> > >>>>>>>>>>> the fine print, realized you'd have to do it yourself
> instead?
> > >>>>>>>>>>>
> > >>>>>>>>>>> Ultimately, the more I think about this, the more I believe
> > that
> > >>>>>>>> we're
> > >>>>>>>>>>> adding noise to the API (with the new overloaded variant of
> > >>>>> send)
> > >>>>>>>> for a
> > >>>>>>>>>>> feature that will likely bring confusion and even frustration
> > to
> > >>>>>>>> anyone
> > >>>>>>>>>>> besides maintainers of Kafka Streams who tries to use it.
> > >>>>>>>>>>>
> > >>>>>>>>>>> If the only concern about covering broker-side errors is that
> > it
> > >>>>>>>> would
> > >>>>>>>>> be
> > >>>>>>>>>>> more difficult to implement, I believe we should strongly
> > >>>>>>> reconsider
> > >>>>>>>>> that
> > >>>>>>>>>>> alternative. That said, if there is a straightforward way to
> > >>>>>>> explain
> > >>>>>>>>> this
> > >>>>>>>>>>> feature to new users that won't mislead them or require them
> to
> > >>>>> do
> > >>>>>>>>>> research
> > >>>>>>>>>>> on producer internals, then I can still live with it.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Regarding a list of recoverable vs. irrecoverable errors,
> this
> > >>>>> is
> > >>>>>>>>>> actually
> > >>>>>>>>>>> the subject of another recently-introduced KIP:
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>
> > >>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1050%3A+Consistent+error+handling+for+Transactions
> > >>>>>>>>>>>
> > >>>>>>>>>>> Finally, I'd also like to ask the people who have already
> voted
> > >>>>>>>>> (Andrew,
> > >>>>>>>>>>> Matthias) if, at the time they voted, they believed that the
> > API
> > >>>>>>>> would
> > >>>>>>>>>>> handle all errors, or only the subset of errors that would
> > >>>>> cause a
> > >>>>>>>>> record
> > >>>>>>>>>>> to be rejected from a batch before it can be sent to a
> broker.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Best,
> > >>>>>>>>>>>
> > >>>>>>>>>>> Chris
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Thu, Jul 4, 2024 at 12:43 PM Alieh Saeedi
> > >>>>>>>>>> <asae...@confluent.io.invalid>
> > >>>>>>>>>>> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>>> Salut from the KIP’s author
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Clarifying two points:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> 1) broker side errors:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> As far as I remember we are not going to cover the errors
> > >>>>>>>> originating
> > >>>>>>>>>> from
> > >>>>>>>>>>>> the broker!
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> A historical fact: One of the debate points in KIP-1038 was
> > >>>>> that
> > >>>>>>> by
> > >>>>>>>>>>>> defining a producer custom handler, the user may assume that
> > >>>>>>>>> broker-side
> > >>>>>>>>>>>> errors must be covered as well. They may define a handler
> for
> > >>>>>>>> handling
> > >>>>>>>>>>>> `RecordTooLargeException` and still see such errors not
> being
> > >>>>>>>> handled
> > >>>>>>>>> as
> > >>>>>>>>>>>> they wish.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> 2) Regarding irrecoverable/recoverable errors:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Before the fix of `KAFKA-9279`,  errors such as
> > >>>>>>>>>> `RecordTooLargeException`
> > >>>>>>>>>>>> or errors related to missing meta data (both originating
> from
> > >>>>>>>> Producer
> > >>>>>>>>>>>> `send()`) were considered as recoverable but after that they
> > >>>>>>> turned
> > >>>>>>>>> into
> > >>>>>>>>>>>> being irrecoverable without changing any Javadocs or having
> > any
> > >>>>>>> KIP.
> > >>>>>>>>>> All
> > >>>>>>>>>>>> the effort made in this KIP and the former one have been
> > >>>>> towards
> > >>>>>>>>>> returning
> > >>>>>>>>>>>> to the former state.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> I am sure that it is clear for you that which sort of errors
> > we
> > >>>>>>> are
> > >>>>>>>>>> going
> > >>>>>>>>>>>> to cover: A single record may happen to NOT get added to the
> > >>>>>>> batch
> > >>>>>>>> due
> > >>>>>>>>>> to
> > >>>>>>>>>>>> the issues with the record or its corresponding topic. The
> > >>>>> point
> > >>>>>>> was
> > >>>>>>>>>> that
> > >>>>>>>>>>>> if the record is not added to the batch let ’s don’t fail
> the
> > >>>>>>> whole
> > >>>>>>>>>> batch
> > >>>>>>>>>>>> because of that non-existing record. We never intended to do
> > >>>>> sth
> > >>>>>>> in
> > >>>>>>>>>> broker
> > >>>>>>>>>>>> side or ignore more important errors.  But I agree with you
> > >>>>>>> Chris.
> > >>>>>>>> If
> > >>>>>>>>> we
> > >>>>>>>>>>>> are adding a new API, we must have good documentation for
> > that.
> > >>>>>>> The
> > >>>>>>>>>>>> sentence `all irrecoverable transactional errors will still
> be
> > >>>>>>>> fatal`
> > >>>>>>>>> as
> > >>>>>>>>>>>> you suggested is good. What do you think? I am totally
> against
> > >>>>>>>>>> enumerating
> > >>>>>>>>>>>> errors in Javadocs since these sort of errors can be
> changing
> > >>>>>>> during
> > >>>>>>>>>>>> time.  More
> > >>>>>>>>>>>> over, have you ever seen any list of recoverable or
> > >>>>> irrecoverable
> > >>>>>>>>> errors
> > >>>>>>>>>>>> somewhere so far?
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Bests,
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Alieh
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> On Wed, Jul 3, 2024 at 6:07 PM Chris Egerton
> > >>>>>>>> <chr...@aiven.io.invalid
> > >>>>>>>>>>
> > >>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> Hi Justine,
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> I agree that enumerating a list of errors that should be
> > >>>>>>> covered by
> > >>>>>>>>> the
> > >>>>>>>>>>>> KIP
> > >>>>>>>>>>>>> is difficult; I was thinking it might be easier if we list
> > the
> > >>>>>>>> errors
> > >>>>>>>>>>>> that
> > >>>>>>>>>>>>> should _not_ be covered by the KIP, and only if we can't
> > >>>>> define
> > >>>>>>> a
> > >>>>>>>>>>>>> reasonable heuristic that would cover them without having
> to
> > >>>>>>>>> explicitly
> > >>>>>>>>>>>>> list them. Could it be enough to say "all irrecoverable
> > >>>>>>>> transactional
> > >>>>>>>>>>>>> errors will still be fatal", or even just "all
> transactional
> > >>>>>>> errors
> > >>>>>>>>> (as
> > >>>>>>>>>>>>> opposed to errors related to this specific record) will
> still
> > >>>>> be
> > >>>>>>>>>> fatal"?
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Chris
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> On Wed, Jul 3, 2024 at 11:56 AM Justine Olshan
> > >>>>>>>>>>>>> <jols...@confluent.io.invalid>
> > >>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Hey Chris,
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> I think what you say makes sense. I agree that defining
> the
> > >>>>>>>> behavior
> > >>>>>>>>>>>>> based
> > >>>>>>>>>>>>>> on code that can possibly change is not a good idea, and I
> > >>>>> was
> > >>>>>>>>> trying
> > >>>>>>>>>>>> to
> > >>>>>>>>>>>>>> get a clearer definition from the KIP's author :)
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> I think it can always be hard to ensure that only specific
> > >>>>>>> errors
> > >>>>>>>>> are
> > >>>>>>>>>>>>>> handled unless they are explicitly enumerated in code as
> the
> > >>>>>>> code
> > >>>>>>>>> can
> > >>>>>>>>>>>>>> change and can be changed by folks who are not aware of
> this
> > >>>>>>> KIP
> > >>>>>>>> or
> > >>>>>>>>>>>>>> conversation.
> > >>>>>>>>>>>>>> I personally don't have the bandwidth to do this
> > >>>>>>>>>> definition/enumeration
> > >>>>>>>>>>>>> of
> > >>>>>>>>>>>>>> errors, so hopefully Alieh can expand upon this.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Justine
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> On Wed, Jul 3, 2024 at 8:28 AM Chris Egerton
> > >>>>>>>>> <chr...@aiven.io.invalid
> > >>>>>>>>>>>
> > >>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Hi Alieh,
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> I don't love defining the changes for this KIP in terms
> of
> > a
> > >>>>>>>> catch
> > >>>>>>>>>>>>> clause
> > >>>>>>>>>>>>>>> in the KafkaProducer class, for two reasons. First, the
> set
> > >>>>> of
> > >>>>>>>>> errors
> > >>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>> are handled by that clause may shift over time as the
> code
> > >>>>>>> base
> > >>>>>>>> is
> > >>>>>>>>>>>>>>> modified, and second, it would be fairly opaque to users
> > who
> > >>>>>>> want
> > >>>>>>>>> to
> > >>>>>>>>>>>>>>> understand whether an error would be affected by using
> this
> > >>>>>>> API
> > >>>>>>>> or
> > >>>>>>>>>>>> not.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> It also seems strange that we'd handle some types of
> > >>>>>>>>>>>>>>> RecordTooLargeException (i.e., ones reported client-side)
> > >>>>> with
> > >>>>>>>> this
> > >>>>>>>>>>>>> API,
> > >>>>>>>>>>>>>>> but not others (i.e., ones reported by a broker).
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> I think this kind of API would be most powerful, most
> > >>>>>>> intuitive
> > >>>>>>>> to
> > >>>>>>>>>>>>> users,
> > >>>>>>>>>>>>>>> and easiest to document if we expanded the scope to all
> > >>>>>>>>>>>>>> record-send-related
> > >>>>>>>>>>>>>>> errors, except anything indicating issues with
> exactly-once
> > >>>>>>>>>>>> semantics.
> > >>>>>>>>>>>>>> That
> > >>>>>>>>>>>>>>> would include records that are too large (when caught
> both
> > >>>>>>>> client-
> > >>>>>>>>>>>> and
> > >>>>>>>>>>>>>>> server-side), records that can't be sent due to
> > >>>>> authorization
> > >>>>>>>>>>>> failures,
> > >>>>>>>>>>>>>>> records sent to nonexistent topics/topic partitions, and
> > >>>>>>> keyless
> > >>>>>>>>>>>>> records
> > >>>>>>>>>>>>>>> sent to compacted topics. It would not include
> > >>>>>>>>>>>>>>> ProducerFencedException, InvalidProducerEpochException,
> > >>>>>>>>>>>>>>> UnsupportedVersionException,
> > >>>>>>>>>>>>>>> and possibly others.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> @Justine -- do you think it would be possible to develop
> > >>>>>>> either a
> > >>>>>>>>>>>>> better
> > >>>>>>>>>>>>>>> definition for the kinds of "excluded" errors that should
> > >>>>> not
> > >>>>>>> be
> > >>>>>>>>>>>>> covered
> > >>>>>>>>>>>>>> by
> > >>>>>>>>>>>>>>> this API, or, barring that, a comprehensive list of exact
> > >>>>>>> error
> > >>>>>>>>>>>> types?
> > >>>>>>>>>>>>>> And
> > >>>>>>>>>>>>>>> do you think this would be acceptable in terms of risk
> and
> > >>>>>>>>>>>> complexity?
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Chris
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> On Tue, Jul 2, 2024 at 5:05 PM Alieh Saeedi
> > >>>>>>>>>>>>> <asae...@confluent.io.invalid
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Hey Justine,
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> About the consequences: the consequences will be like
> when
> > >>>>> we
> > >>>>>>>> did
> > >>>>>>>>>>>> not
> > >>>>>>>>>>>>>>> have
> > >>>>>>>>>>>>>>>> the fix made in `KAFKA-9279`: silent loss of data!
> > >>>>> Obviously,
> > >>>>>>>> when
> > >>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>> user
> > >>>>>>>>>>>>>>>> intentionally chose to ignore errors, that would not be
> > >>>>>>> silent
> > >>>>>>>> any
> > >>>>>>>>>>>>>> more.
> > >>>>>>>>>>>>>>>> Right?
> > >>>>>>>>>>>>>>>> Of course, considering all types of `ApiException`s
> would
> > >>>>> be
> > >>>>>>> too
> > >>>>>>>>>>>>> broad.
> > >>>>>>>>>>>>>>> But
> > >>>>>>>>>>>>>>>> are the exceptions caught in `catch(ApiException e)` of
> > the
> > >>>>>>>>>>>>> `doSend()`
> > >>>>>>>>>>>>>>>> method also too broad?
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> -Alieh
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> On Tue, Jul 2, 2024 at 9:45 PM Justine Olshan
> > >>>>>>>>>>>>>>> <jols...@confluent.io.invalid
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Hey Alieh,
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> If we want to allow any error to be ignored we should
> > >>>>>>> probably
> > >>>>>>>>>>>> run
> > >>>>>>>>>>>>>>>> through
> > >>>>>>>>>>>>>>>>> all the errors to make sure they make sense.
> > >>>>>>>>>>>>>>>>> I just want to feel confident that we aren't just
> making
> > a
> > >>>>>>>>>>>> decision
> > >>>>>>>>>>>>>>>> without
> > >>>>>>>>>>>>>>>>> considering the consequences carefully.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Justine
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> On Tue, Jul 2, 2024 at 12:30 PM Alieh Saeedi
> > >>>>>>>>>>>>>>>> <asae...@confluent.io.invalid
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Hey Justine,
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> yes we talked about `RecordTooLargeException` as an
> > >>>>>>> example,
> > >>>>>>>>>>>> but
> > >>>>>>>>>>>>>> did
> > >>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>> ever limit ourselves to only this specific exception?
> I
> > >>>>>>> think
> > >>>>>>>>>>>>>> neither
> > >>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>> the KIP nor in the PR.  As Chris mentioned, this KIP
> is
> > >>>>>>> going
> > >>>>>>>>>>>> to
> > >>>>>>>>>>>>>> undo
> > >>>>>>>>>>>>>>>>> what
> > >>>>>>>>>>>>>>>>>> we have done in `KAFKA-9279` in case 1) the user is
> in a
> > >>>>>>>>>>>>>> transaction
> > >>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>> 2)
> > >>>>>>>>>>>>>>>>>> he decides to ignore the errors in which the record
> was
> > >>>>> not
> > >>>>>>>>>>>> even
> > >>>>>>>>>>>>>>> added
> > >>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>> the batch. Yes, and we suggested some methods for
> > undoing
> > >>>>>>> or,
> > >>>>>>>>>>>> in
> > >>>>>>>>>>>>>>> fact,
> > >>>>>>>>>>>>>>>>>> moving back the transaction from the error state in
> > >>>>>>> `flush` or
> > >>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>> `commitTnx` and we finally came to the idea of not
> even
> > >>>>>>> doing
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>> changes
> > >>>>>>>>>>>>>>>>>> (better than undoing) in `send`.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Bests,
> > >>>>>>>>>>>>>>>>>> Alieh
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> On Tue, Jul 2, 2024 at 8:03 PM Justine Olshan
> > >>>>>>>>>>>>>>>>> <jols...@confluent.io.invalid
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Hey folks,
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> I understand where you are coming from by asking for
> > >>>>>>> specific
> > >>>>>>>>>>>>> use
> > >>>>>>>>>>>>>>>>> cases.
> > >>>>>>>>>>>>>>>>>> My
> > >>>>>>>>>>>>>>>>>>> understanding based on previous conversations was
> that
> > >>>>>>> there
> > >>>>>>>>>>>>>> were a
> > >>>>>>>>>>>>>>>> few
> > >>>>>>>>>>>>>>>>>>> different errors that have been seen.
> > >>>>>>>>>>>>>>>>>>> One example I heard some information about was when
> the
> > >>>>>>>>>>>> record
> > >>>>>>>>>>>>>> was
> > >>>>>>>>>>>>>>>> too
> > >>>>>>>>>>>>>>>>>>> large and it fails the batch. Besides that, I'm not
> > >>>>> really
> > >>>>>>>>>>>> sure
> > >>>>>>>>>>>>>> if
> > >>>>>>>>>>>>>>>>> there
> > >>>>>>>>>>>>>>>>>>> are cases in mind, though it is fair to ask on those
> > and
> > >>>>>>>>>>>> bring
> > >>>>>>>>>>>>>> them
> > >>>>>>>>>>>>>>>> up.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Does a record qualify as a poison pill if it
> targets a
> > >>>>>>>>>>>> topic
> > >>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>> doesn't exist? Or if it targets a topic that the
> > >>>>> producer
> > >>>>>>>>>>>>>> principal
> > >>>>>>>>>>>>>>>>> lacks
> > >>>>>>>>>>>>>>>>>>> ACLs for? What if it fails broker-side validation
> > (e.g.,
> > >>>>>>> has
> > >>>>>>>>>>>> a
> > >>>>>>>>>>>>>> null
> > >>>>>>>>>>>>>>>> key
> > >>>>>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>> a compacted topic)?
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> I think there was some parallel work with addressing
> > the
> > >>>>>>>>>>>>>>>>>>> UnknownTopicOrPartitionError in another way. As for
> the
> > >>>>>>> other
> > >>>>>>>>>>>>>>> checks,
> > >>>>>>>>>>>>>>>>>> acls,
> > >>>>>>>>>>>>>>>>>>> validation etc. I am not aware of that being in
> Alieh's
> > >>>>>>>>>>>> scope,
> > >>>>>>>>>>>>>> but
> > >>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>> should be clear about exactly what we are doing.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> All errors that fall into ApiException seems too
> broad
> > >>>>> to
> > >>>>>>> me.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Justine
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> On Tue, Jul 2, 2024 at 10:51 AM Alieh Saeedi
> > >>>>>>>>>>>>>>>>>> <asae...@confluent.io.invalid
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Hey Chris,
> > >>>>>>>>>>>>>>>>>>>> thanks for sharing your concerns.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> 1) About the language of KIP (or maybe later in
> > >>>>>>> Javadocs):
> > >>>>>>>>>>>> Is
> > >>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>> alright
> > >>>>>>>>>>>>>>>>>>>> if I write all errors that fall into the
> > `ApiException`
> > >>>>>>>>>>>>>> category
> > >>>>>>>>>>>>>>>>> thrown
> > >>>>>>>>>>>>>>>>>>>> (actually returned) by Producer?
> > >>>>>>>>>>>>>>>>>>>> 2) About future expansion: do you have any better
> > >>>>>>>>>>>> suggestions
> > >>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>> enum
> > >>>>>>>>>>>>>>>>>>>> names? Do you think `IGNORE_API_EXEPTIONS` or
> > something
> > >>>>>>>>>>>> like
> > >>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>>> "better/more accurate" one?
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Bests,
> > >>>>>>>>>>>>>>>>>>>> Alieh
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> On Tue, Jul 2, 2024 at 7:29 PM Chris Egerton
> > >>>>>>>>>>>>>>>> <chr...@aiven.io.invalid
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Hi Alieh and Justine,
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> I'm concerned that we're settling on a definition
> of
> > >>>>>>>>>>>>> "poison
> > >>>>>>>>>>>>>>>> pill"
> > >>>>>>>>>>>>>>>>>>> that's
> > >>>>>>>>>>>>>>>>>>>>> easiest to tackle right now but may lead to
> > >>>>> shortcomings
> > >>>>>>>>>>>>> down
> > >>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>> road. I
> > >>>>>>>>>>>>>>>>>>>>> understand the relationship between this KIP and
> > >>>>>>>>>>>>> KAFKA-9279,
> > >>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>> I
> > >>>>>>>>>>>>>>>>>> can
> > >>>>>>>>>>>>>>>>>>>>> totally get behind the desire to keep things small,
> > >>>>>>>>>>>>> focused,
> > >>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>> simple
> > >>>>>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>> the name of avoiding bugs. However, what I don't
> > think
> > >>>>>>> is
> > >>>>>>>>>>>>>> clear
> > >>>>>>>>>>>>>>>> at
> > >>>>>>>>>>>>>>>>>> all
> > >>>>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>> what the "specific circumstances" are that Justine
> > >>>>>>>>>>>>>> mentioned. I
> > >>>>>>>>>>>>>>>>> had a
> > >>>>>>>>>>>>>>>>>>>>> drastically different idea of what the intended
> > >>>>>>>>>>>> behavioral
> > >>>>>>>>>>>>>>> change
> > >>>>>>>>>>>>>>>>>> would
> > >>>>>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>> before looking at the draft PR.
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> I would like 1) for us to be clearer about the
> > >>>>>>> categories
> > >>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>> errors
> > >>>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>>> want to cover with this new API (especially since
> > >>>>> we'll
> > >>>>>>>>>>>>> have
> > >>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>> find
> > >>>>>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>>>> clear, succinct way to document this for users),
> and
> > >>>>> 2)
> > >>>>>>>>>>>> to
> > >>>>>>>>>>>>>> make
> > >>>>>>>>>>>>>>>>> sure
> > >>>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>> if we do try to expand this API in the future, that
> > we
> > >>>>>>>>>>>>> won't
> > >>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>> painted
> > >>>>>>>>>>>>>>>>>>>>> into a corner.
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> For item 1, hopefully we can agree that the
> language
> > >>>>> in
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>> KIP
> > >>>>>>>>>>>>>>>>>>>>> for IGNORE_SEND_ERRORS ("The records causing
> > >>>>>>>>>>>> irrecoverable
> > >>>>>>>>>>>>>>> errors
> > >>>>>>>>>>>>>>>>> are
> > >>>>>>>>>>>>>>>>>>>>> excluded from the batch and the transaction is
> > >>>>> committed
> > >>>>>>>>>>>>>>>>>>> successfully.")
> > >>>>>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>> pretty vague. If we start using the phrase "poison
> > >>>>> pill
> > >>>>>>>>>>>>>> record"
> > >>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>> could
> > >>>>>>>>>>>>>>>>>>>>> help, but IMO more detail would still be needed. We
> > >>>>> know
> > >>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>> want
> > >>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>> include records that are so large that they can be
> > >>>>>>>>>>>>>> immediately
> > >>>>>>>>>>>>>>>>>> rejected
> > >>>>>>>>>>>>>>>>>>>> by
> > >>>>>>>>>>>>>>>>>>>>> the producer. But there are other cases that users
> > >>>>> might
> > >>>>>>>>>>>>>> expect
> > >>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>> handled. Does a record qualify as a poison pill if
> it
> > >>>>>>>>>>>>>> targets a
> > >>>>>>>>>>>>>>>>> topic
> > >>>>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>> doesn't exist? Or if it targets a topic that the
> > >>>>>>> producer
> > >>>>>>>>>>>>>>>> principal
> > >>>>>>>>>>>>>>>>>>> lacks
> > >>>>>>>>>>>>>>>>>>>>> ACLs for? What if it fails broker-side validation
> > >>>>> (e.g.,
> > >>>>>>>>>>>>> has
> > >>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>> null
> > >>>>>>>>>>>>>>>>>> key
> > >>>>>>>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>>>> a compacted topic)?
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> For item 2, this really depends on how narrow the
> > >>>>> scope
> > >>>>>>>>>>>> of
> > >>>>>>>>>>>>>> what
> > >>>>>>>>>>>>>>>>> we're
> > >>>>>>>>>>>>>>>>>>>> doing
> > >>>>>>>>>>>>>>>>>>>>> right now is. If we only handle a subset of the
> > >>>>> examples
> > >>>>>>>>>>>> I
> > >>>>>>>>>>>>>> laid
> > >>>>>>>>>>>>>>>> out
> > >>>>>>>>>>>>>>>>>>> above
> > >>>>>>>>>>>>>>>>>>>>> that could possibly be considered poison pills with
> > >>>>> this
> > >>>>>>>>>>>>> KIP,
> > >>>>>>>>>>>>>>> do
> > >>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>> want
> > >>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>> lock ourselves in to never addressing more in the
> > >>>>>>> future,
> > >>>>>>>>>>>>> or
> > >>>>>>>>>>>>>>> can
> > >>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>> choose
> > >>>>>>>>>>>>>>>>>>>>> an API (probably just enum names would be the only
> > >>>>>>>>>>>>> important
> > >>>>>>>>>>>>>>>>> decision
> > >>>>>>>>>>>>>>>>>>>> here)
> > >>>>>>>>>>>>>>>>>>>>> that leaves room for more later?
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Chris
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> On Tue, Jul 2, 2024 at 12:28 PM Justine Olshan
> > >>>>>>>>>>>>>>>>>>>>> <jols...@confluent.io.invalid>
> > >>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> Chris and Alieh,
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> My understanding is that this KIP is really only
> > >>>>> trying
> > >>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>> solve
> > >>>>>>>>>>>>>>>>> an
> > >>>>>>>>>>>>>>>>>>>> issue
> > >>>>>>>>>>>>>>>>>>>>>> of a "poison pill" record that fails send().
> > >>>>>>>>>>>>>>>>>>>>>> We've talked a lot about having a generic
> framework
> > >>>>> for
> > >>>>>>>>>>>>> all
> > >>>>>>>>>>>>>>>>> errors,
> > >>>>>>>>>>>>>>>>>>>> but I
> > >>>>>>>>>>>>>>>>>>>>>> don't think that is what this KIP is trying to do.
> > >>>>>>>>>>>>>>> Essentially
> > >>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>> request
> > >>>>>>>>>>>>>>>>>>>>>> is to undo the change from KAFKA-9279
> > >>>>>>>>>>>>>>>>>>>>>> <https://issues.apache.org/jira/browse/KAFKA-9279
> >
> > >>>>> but
> > >>>>>>>>>>>>>> under
> > >>>>>>>>>>>>>>>>>>> specific
> > >>>>>>>>>>>>>>>>>>>>>> circumstances that are controlled. I really am
> > >>>>>>>>>>>> concerned
> > >>>>>>>>>>>>>>> about
> > >>>>>>>>>>>>>>>>>>> opening
> > >>>>>>>>>>>>>>>>>>>>> new
> > >>>>>>>>>>>>>>>>>>>>>> avenues for bugs with EOS and hesitate to handle
> any
> > >>>>>>>>>>>>> other
> > >>>>>>>>>>>>>>>> types
> > >>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>> errors.
> > >>>>>>>>>>>>>>>>>>>>>> I think if we all agree on the problem that we are
> > >>>>>>>>>>>> trying
> > >>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>> solve,
> > >>>>>>>>>>>>>>>>>>> it
> > >>>>>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>>> easier to agree on solutions.
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> Justine
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> On Mon, Jul 1, 2024 at 2:20 AM Alieh Saeedi
> > >>>>>>>>>>>>>>>>>>>> <asae...@confluent.io.invalid
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> Hi Matthias,
> > >>>>>>>>>>>>>>>>>>>>>>> Thanks for the valid points you mentioned. I
> > updated
> > >>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>> KIP
> > >>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>> PR
> > >>>>>>>>>>>>>>>>>>>>>>> with:
> > >>>>>>>>>>>>>>>>>>>>>>> 1) mentioning that the new overloaded `send`
> throws
> > >>>>>>>>>>>>>>>>>>>>>> `IllegalStateException`
> > >>>>>>>>>>>>>>>>>>>>>>> if the user tries to ignore `send()` errors
> outside
> > >>>>>>>>>>>> of
> > >>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>> transaction.
> > >>>>>>>>>>>>>>>>>>>>>>> 2) the default implementation in `Producer`
> > >>>>> interface
> > >>>>>>>>>>>>>>> throws
> > >>>>>>>>>>>>>>>> an
> > >>>>>>>>>>>>>>>>>>>>>>> `UnsupportedOperationException`
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> Hi Chris,
> > >>>>>>>>>>>>>>>>>>>>>>> Thanks for the feedback. I tried to clarify the
> > >>>>>>>>>>>> points
> > >>>>>>>>>>>>>> you
> > >>>>>>>>>>>>>>>>>> listed:
> > >>>>>>>>>>>>>>>>>>>>>>> -------> we've narrowed the scope from any error
> > >>>>> that
> > >>>>>>>>>>>>>> might
> > >>>>>>>>>>>>>>>>> take
> > >>>>>>>>>>>>>>>>>>>> place
> > >>>>>>>>>>>>>>>>>>>>>> with
> > >>>>>>>>>>>>>>>>>>>>>>> producing a record to Kafka, to only the ones
> that
> > >>>>>>>>>>>> are
> > >>>>>>>>>>>>>>> thrown
> > >>>>>>>>>>>>>>>>>>>> directly
> > >>>>>>>>>>>>>>>>>>>>>> from
> > >>>>>>>>>>>>>>>>>>>>>>> Producer::send;
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>   From the very beginning and even since
> KIP-1038,
> > >> the
> > >>>>>>>>>>>>> main
> > >>>>>>>>>>>>>>>>> purpose
> > >>>>>>>>>>>>>>>>>>> was
> > >>>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>> have "more flexibility," or, in other words,
> > "giving
> > >>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>> user
> > >>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>> authority" to handle some specific exceptions
> > thrown
> > >>>>>>>>>>>>> from
> > >>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>> `Producer`.
> > >>>>>>>>>>>>>>>>>>>>>>> Due to the specific cases we had in mind,
> KIP-1038
> > >>>>>>>>>>>> was
> > >>>>>>>>>>>>>>>>> discarded
> > >>>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>>>>> decided to not define a `CustomExceptionHandler`
> > for
> > >>>>>>>>>>>>>>>> `Producer`
> > >>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>> instead
> > >>>>>>>>>>>>>>>>>>>>>>> treat the `send` failures in a different way. The
> > >>>>>>>>>>>> main
> > >>>>>>>>>>>>>>> issue
> > >>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>>> `send`
> > >>>>>>>>>>>>>>>>>>>>>>> makes a transition to error state, which is
> > >>>>> undoable.
> > >>>>>>>>>>>>> In
> > >>>>>>>>>>>>>>>> fact,
> > >>>>>>>>>>>>>>>>>> one
> > >>>>>>>>>>>>>>>>>>>>> single
> > >>>>>>>>>>>>>>>>>>>>>>> poison pill record makes the whole batch fail.
> The
> > >>>>>>>>>>>>> former
> > >>>>>>>>>>>>>>>>>>> suggestions
> > >>>>>>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>>>> you agreed with have been all about un-doing this
> > >>>>>>>>>>>>>>> transition
> > >>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>> `flush`
> > >>>>>>>>>>>>>>>>>>>>>> or
> > >>>>>>>>>>>>>>>>>>>>>>> `commit`. The new suggestion is to un-do (or
> > better,
> > >>>>>>>>>>>>> NOT
> > >>>>>>>>>>>>>>> do)
> > >>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>> `send`
> > >>>>>>>>>>>>>>>>>>>>>> due
> > >>>>>>>>>>>>>>>>>>>>>>> to the reasons listed in the discussions above.
> > >>>>>>>>>>>>>>>>>>>>>>> Moreover, I would say that having such a large
> > scope
> > >>>>>>>>>>>> as
> > >>>>>>>>>>>>>> you
> > >>>>>>>>>>>>>>>>>>> mentioned
> > >>>>>>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>>>> impossible. In the best case, we may have control
> > >>>>>>>>>>>> over
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>> `Producer`.
> > >>>>>>>>>>>>>>>>>>>>>> What
> > >>>>>>>>>>>>>>>>>>>>>>> shall we do with the broker? The `any error that
> > >>>>>>>>>>>> might
> > >>>>>>>>>>>>>> take
> > >>>>>>>>>>>>>>>>> place
> > >>>>>>>>>>>>>>>>>>>> with
> > >>>>>>>>>>>>>>>>>>>>>>> producing a record to Kafka` is too much, I
> think.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> -------> is this all we want to handle, and will
> it
> > >>>>>>>>>>>>>> prevent
> > >>>>>>>>>>>>>>>> us
> > >>>>>>>>>>>>>>>>>> from
> > >>>>>>>>>>>>>>>>>>>>>>> handling more in the future in an intuitive way?
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> I think yes. This is all we want. Other sorts of
> > >>>>>>>>>>>> errors
> > >>>>>>>>>>>>>>> such
> > >>>>>>>>>>>>>>>> as
> > >>>>>>>>>>>>>>>>>>>> having
> > >>>>>>>>>>>>>>>>>>>>>>> problem with partition addition, producer fenced
> > >>>>>>>>>>>>>> exception,
> > >>>>>>>>>>>>>>>> etc
> > >>>>>>>>>>>>>>>>>>> seem
> > >>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>>>> more serious issues. The intention was to handle
> > >>>>>>>>>>>>> problems
> > >>>>>>>>>>>>>>>>> created
> > >>>>>>>>>>>>>>>>>>> by
> > >>>>>>>>>>>>>>>>>>>>>>> (maybe) a single poison pill record. BTW, I do
> not
> > >>>>>>>>>>>> see
> > >>>>>>>>>>>>>> any
> > >>>>>>>>>>>>>>>>>>> obstacles
> > >>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>> future changes.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> Bests,
> > >>>>>>>>>>>>>>>>>>>>>>> Alieh
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> On Sat, Jun 29, 2024 at 3:03 AM Chris Egerton
> > >>>>>>>>>>>>>>>>>>>> <chr...@aiven.io.invalid
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> Ah, sorry--spoke too soon. The PR doesn't show
> > that
> > >>>>>>>>>>>>>>> errors
> > >>>>>>>>>>>>>>>>>> thrown
> > >>>>>>>>>>>>>>>>>>>>> from
> > >>>>>>>>>>>>>>>>>>>>>>>> Producer::send are handled, but instead,
> > >>>>>>>>>>>> ApiException
> > >>>>>>>>>>>>>>>>> instances
> > >>>>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>>> are
> > >>>>>>>>>>>>>>>>>>>>>>>> caught inside KafkaProducer::doSend and are
> > handled
> > >>>>>>>>>>>>> by
> > >>>>>>>>>>>>>>>>>> returning
> > >>>>>>>>>>>>>>>>>>> an
> > >>>>>>>>>>>>>>>>>>>>>>>> already-failed future are. I think the same
> > >>>>>>>>>>>> question
> > >>>>>>>>>>>>>>> still
> > >>>>>>>>>>>>>>>>>>> applies
> > >>>>>>>>>>>>>>>>>>>>> (is
> > >>>>>>>>>>>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>>>>>>>>>> all we want to handle, and will it prevent us
> from
> > >>>>>>>>>>>>>>> handling
> > >>>>>>>>>>>>>>>>>> more
> > >>>>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>> future in an intuitive way), though.
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> On Fri, Jun 28, 2024 at 8:57 PM Chris Egerton <
> > >>>>>>>>>>>>>>>>> chr...@aiven.io
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> Hi Alieh,
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> This KIP has evolved a lot since I last looked
> at
> > >>>>>>>>>>>>> it,
> > >>>>>>>>>>>>>>> but
> > >>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>> changes
> > >>>>>>>>>>>>>>>>>>>>>>>> seem
> > >>>>>>>>>>>>>>>>>>>>>>>>> well thought-out both in semantics and API. One
> > >>>>>>>>>>>>>>>> clarifying
> > >>>>>>>>>>>>>>>>>>>>> question I
> > >>>>>>>>>>>>>>>>>>>>>>>> have
> > >>>>>>>>>>>>>>>>>>>>>>>>> is that it looks based on the draft PR that
> we've
> > >>>>>>>>>>>>>>>> narrowed
> > >>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>> scope
> > >>>>>>>>>>>>>>>>>>>>>>> from
> > >>>>>>>>>>>>>>>>>>>>>>>>> any error that might take place with producing
> a
> > >>>>>>>>>>>>>> record
> > >>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>> Kafka,
> > >>>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>> only
> > >>>>>>>>>>>>>>>>>>>>>>>>> the ones that are thrown directly from
> > >>>>>>>>>>>>>> Producer::send;
> > >>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>> intended
> > >>>>>>>>>>>>>>>>>>>>>>>>> behavior here? And if so, do you have thoughts
> on
> > >>>>>>>>>>>>> how
> > >>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>> might
> > >>>>>>>>>>>>>>>>>>>>>> design a
> > >>>>>>>>>>>>>>>>>>>>>>>>> follow-up KIP that would catch all errors
> > >>>>>>>>>>>>> (including
> > >>>>>>>>>>>>>>> ones
> > >>>>>>>>>>>>>>>>>>>> reported
> > >>>>>>>>>>>>>>>>>>>>>>>>> asynchronously instead of synchronously)? I'd
> > >>>>>>>>>>>> like
> > >>>>>>>>>>>>> it
> > >>>>>>>>>>>>>>> if
> > >>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>> could
> > >>>>>>>>>>>>>>>>>>>>>> leave
> > >>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>> door open for that without painting ourselves
> > >>>>>>>>>>>> into
> > >>>>>>>>>>>>>> too
> > >>>>>>>>>>>>>>>> much
> > >>>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>>>>> corner
> > >>>>>>>>>>>>>>>>>>>>>>>>> with the API design for this KIP.
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> Chris
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Jun 28, 2024 at 6:31 PM Matthias J.
> Sax <
> > >>>>>>>>>>>>>>>>>>>> mj...@apache.org>
> > >>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks Alieh,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> it seems this KIP can just pick between a
> couple
> > >>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>> tradeoffs.
> > >>>>>>>>>>>>>>>>>>>>>> Adding
> > >>>>>>>>>>>>>>>>>>>>>>> an
> > >>>>>>>>>>>>>>>>>>>>>>>>>> overloaded `send()` as the KIP propose makes
> > >>>>>>>>>>>> sense
> > >>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>> me
> > >>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>> seems
> > >>>>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>>>> provides the cleanest solution compare to
> there
> > >>>>>>>>>>>>>>> options
> > >>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>>>> discussed.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> Given the explicit name of the passed-in
> option
> > >>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>> highlights
> > >>>>>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>> option is for TX only make is pretty clear and
> > >>>>>>>>>>>>>> avoids
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>> issue
> > >>>>>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>>>>> `flush()` ambiguity.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> Nit: We should make clear on the KIP though,
> > >>>>>>>>>>>> that
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>> new
> > >>>>>>>>>>>>>>>>>>>> `send()`
> > >>>>>>>>>>>>>>>>>>>>>>>>>> overload would throw an
> `IllegalStateException`
> > >>>>>>>>>>>> if
> > >>>>>>>>>>>>>> TX
> > >>>>>>>>>>>>>>>> are
> > >>>>>>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>> used
> > >>>>>>>>>>>>>>>>>>>>>>>>>> (similar to other TX methods like initTx(),
> etc)
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> About the `Producer` interface, I am not sure
> > >>>>>>>>>>>> how
> > >>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>> was
> > >>>>>>>>>>>>>>>>>>> done
> > >>>>>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>> past (eg, KIP-266 added
> > >>>>>>>>>>>> `Consumer.poll(Duration)`
> > >>>>>>>>>>>>>> w/o
> > >>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>> default
> > >>>>>>>>>>>>>>>>>>>>>>>>>> implementation), if we need a default
> > >>>>>>>>>>>>> implementation
> > >>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>>> backward
> > >>>>>>>>>>>>>>>>>>>>>>>>>> compatibility or not? If we do want to add
> one,
> > >>>>>>>>>>>> I
> > >>>>>>>>>>>>>>> think
> > >>>>>>>>>>>>>>>> it
> > >>>>>>>>>>>>>>>>>>> would
> > >>>>>>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>>>>>>> appropriate to throw an
> > >>>>>>>>>>>>>>> `UnsupportedOperationException`
> > >>>>>>>>>>>>>>>> by
> > >>>>>>>>>>>>>>>>>>>>> default,
> > >>>>>>>>>>>>>>>>>>>>>>>>>> instead of just keeping the default impl
> empty?
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> My points are rather minor, and should not
> block
> > >>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>> KIP
> > >>>>>>>>>>>>>>>>>>>> though.
> > >>>>>>>>>>>>>>>>>>>>>>>>>> Overall LGTM.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> On 6/27/24 1:28 PM, Alieh Saeedi wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Justine,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the suggestion.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Making applications to validate every single
> > >>>>>>>>>>>>>> record
> > >>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>> best
> > >>>>>>>>>>>>>>>>>>>>>>>> way,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> from an efficiency point of view.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Moreover, between changing the behavior of
> the
> > >>>>>>>>>>>>>>>> Producer
> > >>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>> `send`
> > >>>>>>>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> `commitTnx`, the former seems more reasonable
> > >>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>> clean.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Bests,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Alieh
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Jun 27, 2024 at 8:14 PM Justine
> Olshan
> > >>>>>>>>>>>>>>>>>>>>>>>>>> <jols...@confluent.io.invalid>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Hey Alieh,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> I see there are two options now. So folks
> > >>>>>>>>>>>> will
> > >>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>> discussing
> > >>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>> approaches
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> and deciding the best way forward before we
> > >>>>>>>>>>>>> vote?
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> I do think there could be a problem with the
> > >>>>>>>>>>>>>>> approach
> > >>>>>>>>>>>>>>>>> on
> > >>>>>>>>>>>>>>>>>>>> commit
> > >>>>>>>>>>>>>>>>>>>>>> if
> > >>>>>>>>>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>>>>>>>> get
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> stuck on an earlier error and have more
> > >>>>>>>>>>>> records
> > >>>>>>>>>>>>>>>>>>> (potentially
> > >>>>>>>>>>>>>>>>>>>> on
> > >>>>>>>>>>>>>>>>>>>>>> new
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> partitions) to commit as the current PR is
> > >>>>>>>>>>>>>>>> implemented.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> I guess this takes us back to the question
> of
> > >>>>>>>>>>>>>>> whether
> > >>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>> error
> > >>>>>>>>>>>>>>>>>>>>>>>> should
> > >>>>>>>>>>>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> cleared on send.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> (And I guess at the back of my mind, I'm
> > >>>>>>>>>>>>>> wondering
> > >>>>>>>>>>>>>>> if
> > >>>>>>>>>>>>>>>>>> there
> > >>>>>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>>>>>> way
> > >>>>>>>>>>>>>>>>>>>>>>>>>> we can
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> validate the "posion pill" records
> > >>>>>>>>>>>> application
> > >>>>>>>>>>>>>> side
> > >>>>>>>>>>>>>>>>>> before
> > >>>>>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>>>> even
> > >>>>>>>>>>>>>>>>>>>>>>>> try
> > >>>>>>>>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> send them)
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Justine
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Jun 26, 2024 at 4:38 PM Alieh Saeedi
> > >>>>>>>>>>>>>>>>>>>>>>>>>> <asae...@confluent.io.invalid
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Justine,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I did not update the KIP with
> > >>>>>>>>>>>> `TxnSendOption`
> > >>>>>>>>>>>>>>> since
> > >>>>>>>>>>>>>>>> I
> > >>>>>>>>>>>>>>>>>>>> thought
> > >>>>>>>>>>>>>>>>>>>>>> it'd
> > >>>>>>>>>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> better discussed here beforehand.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> right now, there are 2 PRs:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> - the PR that implements the current
> version
> > >>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>> KIP:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/16332
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> - the POC PR that clarifies the
> > >>>>>>>>>>>>> `TxnSendOption`:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/16465
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Bests,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Alieh
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Jun 27, 2024 at 12:42 AM Justine
> > >>>>>>>>>>>>> Olshan
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> <jols...@confluent.io.invalid> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hey Alieh,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think I am a little confused. Are the 3
> > >>>>>>>>>>>>>> points
> > >>>>>>>>>>>>>>>>> above
> > >>>>>>>>>>>>>>>>>>>>>> addressed
> > >>>>>>>>>>>>>>>>>>>>>>> by
> > >>>>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> KIP
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> or did something change? The PR seems to
> > >>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>> include
> > >>>>>>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>>>>>>>> change
> > >>>>>>>>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> still
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> has the CommitOption as well.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Justine
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Jun 26, 2024 at 2:15 PM Alieh
> > >>>>>>>>>>>> Saeedi
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> <asae...@confluent.io.invalid
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi all,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Looking at the PR <
> > >>>>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/16332
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> corresponding to the KIP, there are some
> > >>>>>>>>>>>>>> points
> > >>>>>>>>>>>>>>>>> worthy
> > >>>>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>> mention:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) clearing the error ends up dirty/messy
> > >>>>>>>>>>>>> code
> > >>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> `TransactionManager`.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) By clearing the error, we are actually
> > >>>>>>>>>>>>>> doing
> > >>>>>>>>>>>>>>> an
> > >>>>>>>>>>>>>>>>>>> illegal
> > >>>>>>>>>>>>>>>>>>>>>>>>>> transition
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> from
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> `ABORTABLE_ERROR` to `IN_TRANSACTION`
> > >>>>>>>>>>>> which
> > >>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>> conceptually
> > >>>>>>>>>>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> acceptable.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> This can be the root cause of some
> issues,
> > >>>>>>>>>>>>>> with
> > >>>>>>>>>>>>>>>>>> perhaps
> > >>>>>>>>>>>>>>>>>>>>>> further
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> future
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> changes by others.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3) If the poison pill record `r1` causes
> a
> > >>>>>>>>>>>>>>>>> transition
> > >>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>> error
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> state
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and then the next record `r2` requires
> > >>>>>>>>>>>>> adding
> > >>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>> partition
> > >>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> transaction, the action fails due to
> being
> > >>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>> error
> > >>>>>>>>>>>>>>>>>>>>> state.
> > >>>>>>>>>>>>>>>>>>>>>>> In
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> case, clearing errors during
> > >>>>>>>>>>>>>>>>>>> `commitTnx(CLEAR_SEND_ERROR)`
> > >>>>>>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>>>> too
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> late.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> However, this case can NOT be the main
> > >>>>>>>>>>>>> concern
> > >>>>>>>>>>>>>>> as
> > >>>>>>>>>>>>>>>>> soon
> > >>>>>>>>>>>>>>>>>>> as
> > >>>>>>>>>>>>>>>>>>>>>>> KIP-890
> > >>>>>>>>>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> fully
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> implemented.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> My suggestion is to solve the problem
> > >>>>>>>>>>>> where
> > >>>>>>>>>>>>> it
> > >>>>>>>>>>>>>>>>> arises.
> > >>>>>>>>>>>>>>>>>>> If
> > >>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> transition
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the error state does not happen during
> > >>>>>>>>>>>>>> `send()`,
> > >>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>> do
> > >>>>>>>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>>>> need
> > >>>>>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> clear
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the error later. Therefore, instead of
> > >>>>>>>>>>>>>>>>> `CommitOption`,
> > >>>>>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>>> can
> > >>>>>>>>>>>>>>>>>>>>>>>> define
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> `TxnSendOption` and prevent the `send()`
> > >>>>>>>>>>>>>> method
> > >>>>>>>>>>>>>>>> from
> > >>>>>>>>>>>>>>>>>>> going
> > >>>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> error
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> state in case 1) we're in a transaction
> > >>>>>>>>>>>> and
> > >>>>>>>>>>>>> 2)
> > >>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>> user
> > >>>>>>>>>>>>>>>>>>>>> asked
> > >>>>>>>>>>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> IGONRE_SEND_ERRORS. For more clarity, you
> > >>>>>>>>>>>>> can
> > >>>>>>>>>>>>>>>> take a
> > >>>>>>>>>>>>>>>>>>> look
> > >>>>>>>>>>>>>>>>>>>> at
> > >>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>> POC
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> PR
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <
> > >>>>>>>>>>>> https://github.com/apache/kafka/pull/16465
> > >>>>>>>>>>>>>> .
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Alieh
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> > >
> >
>

Reply via email to