Hi Greg,

I appreciate your concerns and comprehensive answer.

I am not sure whether I fully understood what you meant or not. You mean,
at the end, the user can go for one of the following scenarios: Either

1) `beginTxn()` and `send(record)` and `commitTxn()`  or

2) `beginTxn()` and `prepare(record)` and `send(prepared_record)` and
`commitTxn()` ?

Of course, the `send` in scenario 1 is different from the one in scenario
2, since a part of the second one 's job has been done during



On Sat, Jul 20, 2024 at 1:20 AM Greg Harris <greg.har...@aiven.io.invalid>

> Hi Artem and Matthias,
> > On the other hand, the effort to prove that
> > keeping all records in memory won't break some scenarios (and generally
> > breaking one is enough to cause a lot of pain) seems to be significantly
> > higher than to prove that setting some flag in some API has pretty much 0
> > chance of regression
> > in the end, why buffer records twice?
> > This way we don't
> > ignore the error, we're just changing the method they are delivered.
> > Very clean semantics
> > which should also address the concern of "non-atomic tx"
> I feel like my concerns are being minimized instead of being addressed
> in this discussion, and if that's because I'm not expressing them clearly,
> I apologize.
> Many users come to Kafka with prior expectations, especially when we use
> industry-standard terminology like 'Exactly Once Semantics",
> "Transactions", "Commit", "Abort". Of course Kafka isn't an ACID-compliant
> database, but users will evaluate, discuss, and develop applications with
> Kafka through the lens of the ACID principles, because that is the
> framework most commonly applied to transactional semantics.
> The original design of KIP-98 [1] explicitly mentions atomic commits (with
> the same meaning as the A in ACID) as the primary abstraction being added
> (reproduced here):
> > At the core, transactional guarantees enable applications to produce to
> multiple TopicPartitions atomically, ie. all writes to these
> TopicPartitions will succeed or fail as a unit.
> > Further, since consumer progress is recorded as a write to the offsets
> topic, the above capability is leveraged to enable applications to batch
> consumed and produced messages into a single atomic unit, ie. a set of
> messages may be considered consumed only if the entire
> ‘consume-transform-produce’ executed in its entirety.
> I think it's important to say that to a user, "writes" really means "send()
> and commitOffsets() calls", not literal produce requests to Kafka brokers,
> and "consume-transform-produce" really means "poll(), transform, send()".
> This is because to a user, the implementation within poll() and send() and
> the broker are none of their concern, and are intended to be within the
> abstraction.
> When I say that this feature is a non-atomic commit, I mean that this
> feature does not fit the above description, and breaks the transaction
> abstraction in a meaningful way. No longer do all writes succeed or fail as
> a unit, some failures are permitted to drop data. No longer must a
> consume-transform-produce cycle be executed in its entirety, some parts may
> be left incomplete.
> This means that this method will be difficult to define ("which exceptions
> are covered?"), difficult to document ("how do we explain
> 'not-really-atomic commits' clearly and unambiguously to a potential
> user?"), and difficult to compose ("if someone turns this option on, how
> does that affect delivery guarantees and opportunities for bugs in
> upper layers?").
> Users currently rely heavily on analogies to other database systems to make
> sense of Kafka's transactions, and we need to use that to our benefit,
> rather than designing in spite of it being true.
> However this atomicity guarantee isn't always desirable, as evidenced by
> the original bug report [2]. If you're interacting with a website form for
> example, and a database transaction fails because one of your strings is
> oversize, you don't need to re-input all of your form responses from
> scratch, as there is an application layer/browser in-between to preserve
> the state and retry the transaction.
> And while you could make a convenience/performance/etc argument in that
> situation ("The database should truncate/null-out the oversize string") and
> modern databases often have very expressive DML that would permit such a
> behavior (try-catch, etc), the End-to-End arguments [3] make me believe
> that is a bad design and should be discouraged.
> To that end, I was suggesting ways to push this farther and farther up the
> stack, such as performing record size estimation. This doesn't mean that it
> can't be added at a low level of abstraction, just that we need to make
> sure to exhaust all other alternatives, and justify it with a performance
> benefit.
> I was holding off on discussing the literal design until you provided
> concrete performance justification, but to progress the discussion while
> i'm waiting for that, I can give my thoughts:
> I don't think an overloaded send() method is appropriate given that this
> appears to be a niche use-case, and the send() method itself is probably
> the single most important method in the Clients library. The KIP-98 design
> was a much more substantial change to the Producer than this KIP, and it
> found a way to preserve the original type signature (but added an
> exception).
> Users picking up the Producer for the first time may see this additional
> method, and may spend time trying to understand whether it is something
> suitable for their use-case. In the best case, they ignore it and use the
> other two signatures. But it is also possible that they will use it without
> understanding it, and have unexpected data loss. Overall, this feels like a
> net negative to the producer user-base.
> Also boolean, enum, vararg, flags, etc don't follow the current trend of
> the AdminClient passing Options DTOs for modifying individual API calls,
> which is a much more extensible design.
> If there was a way of preparing a record before calling send() that did
> some or all of the pre-send validation (serialization, size checking, maybe
> topic-partition existence existence, authorization, etc) that could be a
> reasonable way to emit these sorts of errors in a way that makes it clear
> that sending hasn't started and the record is not part of the transaction
> yet. I'm imagining something like:
> ```
> public abstract class PreparedRecord<K, V> extends ProducerRecord<K, V> { }
> // Actual instantiated class and methods could be an implementation detail.
> interface Producer {
>     Optional<PreparedRecord<K, V>> prepare(ProducerRecord<K, V> record,
> PrepareOptions options) throws SerializationException,
> RecordTooLargeException;
> }
> // Application code:
> Optional<PreparedRecord<byte[], byte[]>> prepared = Optional.empty();
> try {
>      prepared = producer.prepare(record, null);
> } catch (Exception e) {
>      if (critical(e)) {
>         producer.abortTransaction();
>         return;
>     }
> }
> if (prepared.isPresent()) {
>     producer.send(prepared.get()); // errors will be propagated via
> commitTransaction() and also abort the transaction.
> }
> ```
> Semantically then, it would make sense that the data which has been
> "prepared to send" but has not been "sent" is intentionally left out of the
> batch by the application control-flow, not an option passed into the
> producer changing the control-flow within the Producer. If passed a
> prepared record, the send method can then be responsible only for waiting
> for sufficient buffer capacity, reusing the work done by the prepare()
> method.
> This prepare method could also enable other use-cases, like parallel
> serialization, exception-less handling, or more advanced buffering/balking
> behavior ("refuse to prepare records that can't be sent without blocking").
> And because the synchronous processing is made explicit, it's much easier
> to communicate to users which exceptions are covered and can be prevented
> from causing transaction aborts.
> If someone uses the method, or doesn't use it, they aren't put at risk of
> compromising their delivery guarantees unexpectedly.
> Thanks,
> Greg
> [1]
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> [2] https://issues.apache.org/jira/browse/KAFKA-15259
> [3] https://web.mit.edu/Saltzer/www/publications/endtoend/endtoend.pdf
> On Fri, Jul 19, 2024 at 12:39 PM Matthias J. Sax <mj...@apache.org> wrote:
> > For catching client side errors this would work IMHO. I am ok with this.
> >
> > We throw before we add the record to the batch. Very clean semantics
> > which should also address the concern of "non-atomic tx"... The
> > exception clearly indicates that the record was not added to the TX, and
> > users can react to it accordingly.
> >
> > We did discuss this idea previously, but did not have a good proposal to
> > make it backward compatible. The newly proposed overload would address
> > this issue of backward compatibility.
> >
> > Of course, it might not make it easily extensible in the future for
> > broker side errors, but it's unclear anyway right now, if we would even
> > get to a solution for broker side errors or not -- so maybe it's ok to
> > accept this and drop/ignore the broker side error question for now.
> >
> >
> >
> > A small follow up thought/question: instead of using a boolean, would we
> > actually want to make it a var-arg enum to allow users to enable this
> > for certain errors explicitly and individually? Beside the added
> > flexibility and fine grain control, a var-arg enum would also make the
> > API nicer/cleaner IMHO compare to a boolean.
> >
> > For convenience, this enum could have an additional `ALL` option (and we
> > would call out that if `ALL` is used, new error types might be added in
> > future release making the code less safe/robust -- ie, use at your own
> > risk only)
> >
> > This way, we also explicitly document what exception might be thrown in
> > the KIP, as we would add an enum for each error type explicitly, and
> > also make if future proof for new error types we want to cover -- each
> > addition would require a KIP to extend the enum.
> >
> >
> >
> > -Matthias
> >
> >
> > On 7/18/24 10:33 PM, Artem Livshits wrote:
> > > Hey folks,
> > >
> > > Hopefully not to make this KIP go for another spin :-), but I thought
> of
> > a
> > > modification that might actually address safety concerns over using
> flags
> > > to ignore a vaguely specified class of errors.
> > >
> > > What if we had the following overload of .send method:
> > >
> > >    void send(ProducerRecord record, Callback callback, boolean
> > > throwImmediately)
> > >
> > > and if throwImmediately=false, then we behave the same way as now
> (return
> > > errors via Future and poison transaction) and if throwImmediately=true
> > then
> > > we just throw errors immediately from the send function.  This way we
> > don't
> > > ignore the error, we're just changing the method they are delivered.
> > Then
> > > KStreams can catch the error for send(record, callback, true) and do
> > > whatever it needs to do.
> > >
> > > -Artem
> > >
> > >
> > > On Mon, Jul 15, 2024 at 4:30 PM Greg Harris
> <greg.har...@aiven.io.invalid
> > >
> > > wrote:
> > >
> > >> Matthias,
> > >>
> > >> Thank you for rejecting my suggested alternatives. Your responses are
> > the
> > >> sorts of things I expected to see summarized in the text of the KIP.
> > >>
> > >> I agree with most of your rejections, except this one:
> > >>
> > >>> "Estimation" is not sufficient, but we would need to know it exactly.
> > >>> And that's an impl detail, given that the message format could change
> > >>> and we could add new internal fields increasing the message size.
> > >>
> > >> An estimate is certainly going to have an error. But an estimate
> > shouldn't
> > >> be treated as exact anyway, there should be an error bound, or "safety
> > >> factor" used when interpreting it. For example, if the broker side
> > limit is
> > >> 1MB, and an estimate could be wrong by ~10%, then computing an
> estimate
> > and
> > >> dropping records >900kb should be sufficient to prevent RTLEs.
> > >> This is the sort of estimation that I would expect application
> > developers
> > >> could implement, without knowing the exact serialization and protocol
> > >> overhead. This could prevent user-originated oversize records from
> > making
> > >> it to the producer.
> > >>
> > >> Thanks,
> > >> Greg
> > >>
> > >>
> > >> On Mon, Jul 15, 2024 at 4:08 PM Matthias J. Sax <mj...@apache.org>
> > wrote:
> > >>
> > >>> I agree with Alieh and Artem -- in the end, why buffer records twice?
> > We
> > >>> effectively want to allow to push some error handling (which I btw
> > >>> consider "business logic") into the producer. IMHO, there is nothing
> > >>> wrong with it. Dropping a poison pill record is no really a violation
> > of
> > >>> atomicity from my POV, but a business logic decision to not include a
> > >>> record in a transaction -- the proposed API just makes it much
> simpler
> > >>> to achieve this business logic goal.
> > >>>
> > >>>
> > >>>
> > >>> For memory size estimation, throughput or message size is actually
> not
> > >>> relevant, right? We would need to look at producer buffer size, ie,
> > >>> `batch.size`, `max.in.flight.request.per.connection` and guesstimate
> > the
> > >>> number of connections there might be? At least for KS, we don't need
> to
> > >>> buffer everything until commit, but only until we get a successful
> > "ack"
> > >>> back.
> > >>>
> > >>> Note that KS application not only need to write to (a single) user
> > >>> result topic, but multiple output topics, as well as repartition and
> > >>> changelog topics, across all tasks assigned to a thread (ie,
> producer),
> > >>> which can easily be 10 tasks or more. If we assume topics with 30
> > >>> partitions (topics with 50 or more partitions are not uncommon
> either),
> > >>> and a producer who must write to 10 different topics, the number of
> > >>> required connections is very quickly very high, and thus the required
> > >>> "application buffer space" would be significant.
> > >>>
> > >>>
> > >>>
> > >>> Your others ideas seems not to be viable alternatives:
> > >>>
> > >>>> Streams users that specifically want to drop oversize records can
> > >>>> estimate the size of their data and drop records which are too
> > >>>> large, enforcing their own limits that are lower than the Kafka
> > limits.
> > >>>
> > >>> "Estimation" is not sufficient, but we would need to know it exactly.
> > >>> And that's an impl detail, given that the message format could change
> > >>> and we could add new internal fields increasing the message size. The
> > >>> idea to add some `producer.serializedRecordSize()` helper method was
> > >>> discussed, but it's a very ugly API and clumsy to use -- also, the
> user
> > >>> code would need to know the producer config which it might not have
> > >>> access to (as it might get passed in from some config file; and it
> > might
> > >>> also be changed).
> > >>>
> > >>> Some other alternative we also discussed was, to let `send()` throw
> an
> > >>> exception for a "record too large" case directly. However, this
> > solution
> > >>> raises backward compatibly concerns, and it might also not help us to
> > >>> extend the solution in the future (eg, tackle broker side errors). So
> > we
> > >>> discarded this idea.
> > >>>
> > >>>
> > >>>
> > >>>> Streams users that want CONTINUE semantics can use at_least_once
> > >>>> semantics
> > >>>
> > >>> Not really. EOS is mainly about not having duplicates in the result,
> > but
> > >>> at-least-once cannot provide this guarantee. (Even if I repeat my
> self:
> > >>> but dropping a poison pill record based on a business logic decision
> is
> > >>> not data loss, but effectively a business logic filter...)
> > >>>
> > >>>
> > >>>
> > >>>> Streams itself can store record hashes/coordinates and fast rewind
> to
> > >>>> the end of the last transaction, recomputing data rather than
> storing
> > >> it.
> > >>>
> > >>> Given the very complex nature of topologies, with joins,
> aggregations,
> > >>> flatmaps etc, this is a 100x more complex solution and not viable in
> > >>> practice.
> > >>>
> > >>>
> > >>>
> > >>>> Streams can define exactly_once + CONTINUE semantics to permit the
> > >> whole
> > >>>> transaction to be dropped, because it would allow the next batch to
> be
> > >>>> started processing.
> > >>>
> > >>> Would this not be much worse? I have a single poison pill record and
> > >>> would need to drop a full tx (this could be tens of thousands of
> > >>> records...). Also, given that KS write into changelog topic in the
> same
> > >>> TX, this could break the whole application.
> > >>>
> > >>>
> > >>>
> > >>>> Streams can emit records with both a transactional and
> > >> non-transactional
> > >>>> producer if some records are not critical-path
> > >>>
> > >>> We (1) already have a "too many connections" problem with KS so using
> > >>> move clients is something we try to avoid (and we actually hope to
> > >>> reduce the number of client and connection mid to long term), (2)
> this
> > >>> would be very hard to express at the API level to the user, and (3)
> it
> > >>> would provide very weird semantics.
> > >>>
> > >>>
> > >>>
> > >>>> they should optimize for smaller transactions,
> > >>>
> > >>> IMHO, this would not work in practice because transaction have a high
> > >>> overhead and commit-interval is used to tradeoff throughput vs
> > >>> end-to-end latency. Given certain throughput requirement, it would
> not
> > >>> be possible to just use a lower commit interval to reduce memory
> > >>> requirements.
> > >>>
> > >>>
> > >>>
> > >>> -Matthias
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> On 7/15/24 2:25 PM, Artem Livshits wrote:
> > >>>> Hi Greg,
> > >>>>
> > >>>>> This makes me think that this IGNORE_SEND_ERRORS covers an
> arbitrary
> > >> set
> > >>>> of error conditions that may be expanded in the future, possibly to
> > >> cover
> > >>>> the broker side RecordTooLargeException.
> > >>>>
> > >>>> I don't think it contradicts what I said (the keyword here is "in
> the
> > >>>> future") -- with the current functionality, the correct way to
> handle
> > >>> RTLE
> > >>>> is by only letting the client ignore client-originated RTLE (this
> can
> > >> be
> > >>>> easily implemented on the client side).  In the future, we can
> improve
> > >> on
> > >>>> that by making the broker return a different exception for
> > >>> batch-too-large
> > >>>> case, then the producer would be able to return broker side
> exceptions
> > >> as
> > >>>> well (and if the application chooses to ignore it -- it will be able
> > >> to,
> > >>>> but it would be an explicit choice rather than ignoring it by
> > mistake),
> > >>> in
> > >>>> this case the producer client would encapsulate backward
> compatibility
> > >>>> logic when it connects to older brokers to make sure the the
> > >> application
> > >>>> doesn't accidentally gets RTLE originated by the old broker.  This
> > >>>> functionality is obviously more involved and we'll need to see if
> > going
> > >>> all
> > >>>> the way is justified, but the partial client-only solution doesn't
> > >> close
> > >>>> the door.
> > >>>>
> > >>>> So one way to look at the current situation is the following:
> > >>>>
> > >>>> 1. We can do a low effort partial solution to solve a real existing
> > >>>> problem.  We can easily prove that it would do exactly what it needs
> > to
> > >>> do
> > >>>> with minimal risk of regression.
> > >>>> 2. We have a path to a more comprehensive solution, so if we justify
> > >> the
> > >>>> effort required for that, we can get there.
> > >>>>
> > >>>> BTW, as a side note (I think a saw a question in the thread), we do
> > try
> > >>> to
> > >>>> introduce error categories here
> > >>>>
> > >>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1050%3A+Consistent+error+handling+for+Transactions
> > >>>> so eventually we may have a better classification for the errors.
> > >>>>
> > >>>>> "if a streams producer is producing 1MB/s, and the commit interval
> is
> > >> 1
> > >>>> hour, I expect 3600MB of additional heap needed ...
> > >>>>
> > >>>> Agree, that would be ideal.  On the other hand, the effort to prove
> > >> that
> > >>>> keeping all records in memory won't break some scenarios (and
> > generally
> > >>>> breaking one is enough to cause a lot of pain) seems to be
> > >> significantly
> > >>>> higher than to prove that setting some flag in some API has pretty
> > >> much 0
> > >>>> chance of regression (we basically have a flag to say "unfix
> > >> KAFKA-9279"
> > >>> so
> > >>>> we're getting to fairly "known good" state).  I'll let KStream folks
> > >>>> comment on this one (and we still need to solve the problem of
> > >> accidental
> > >>>> handling of RTLE originated from broker, so some KIP would be
> required
> > >> to
> > >>>> somehow help to differentiate those).
> > >>>>
> > >>>> -Artem
> > >>>>
> > >>>> On Mon, Jul 15, 2024 at 1:31 PM Greg Harris
> > >> <greg.har...@aiven.io.invalid
> > >>>>
> > >>>> wrote:
> > >>>>
> > >>>>> Hi Artem,
> > >>>>>
> > >>>>> Thank you for clarifying as I'm joining the conversation late and
> may
> > >>> have
> > >>>>> some misconceptions.
> > >>>>>
> > >>>>>> Because of this, a more "complete" solution that
> > >>>>>> allows ignoring RecordTooLargeException regardless of its origin
> is
> > >>>>>> actually incorrect, while a "partial" solution that allows
> ignoring
> > >>>>>> RecordTooLargeException only originating in client code
> accomplishes
> > >>> the
> > >>>>>> required functionality.
> > >>>>>
> > >>>>> This is not how I understood this feature. Above Matthias said the
> > >>>>> following:
> > >>>>>
> > >>>>>> We can do
> > >>>>>> follow up KIP for other errors on an on-demand basis and
> fix-forward
> > >> /
> > >>>>>> enlarge the scope successively.
> > >>>>>
> > >>>>> This makes me think that this IGNORE_SEND_ERRORS covers an
> arbitrary
> > >>> set of
> > >>>>> error conditions that may be expanded in the future, possibly to
> > cover
> > >>> the
> > >>>>> broker side RecordTooLargeException.
> > >>>>>
> > >>>>>> Obviously, we could solve this problem by changing logic in the
> > >>>>>> broker to return a different error when the batch is too large,
> but
> > >>> right
> > >>>>>> now this is not the case
> > >>>>>
> > >>>>> If the broker/wire protocol isn't ready for these errors to be
> > >>> propagated,
> > >>>>> then I don't think we're ready to add this API. It's going to be
> > >>>>> under-generalized, and there's a decent chance that we're going to
> > >>> regret
> > >>>>> the design choices in the future. And users that expect it to be
> > fully
> > >>>>> generalized are going to be disappointed when they don't read the
> > fine
> > >>>>> print and still get faulted by non-covered errors.
> > >>>>>
> > >>>>>> AL2.  In a high performance system, "just an optimization" can be
> a
> > >>>>>> functional requirement ...
> > >>>>>>    I just wanted to make the point that we shouldn't necessarily
> > >> dismiss
> > >>>>>> API changes that allow for optimizations.
> > >>>>>
> > >>>>> My earlier statement didn't dismiss this feature as "just an
> > >>> optimization",
> > >>>>> actually the opposite. I said that performance could be a
> > >> justification,
> > >>>>> but only if it is quantified and stated explicitly. We shouldn't be
> > >>> voting
> > >>>>> on hand-wavy optimizations, we should be voting on things that are
> > >>>>> quantifiable.
> > >>>>> For example an analysis like the following would facilitate further
> > >>>>> discussion: "if a streams producer is producing 1MB/s, and the
> commit
> > >>>>> interval is 1 hour, I expect 3600MB of additional heap needed per
> > >>>>> producer". We can then discuss whether we expect higher or lower
> > >>>>> throughput, commit intervals, or heap usage to determine what the
> > >>> operating
> > >>>>> envelope of this feature could be.
> > >>>>> If there are a substantial number of users that have high
> throughput,
> > >>> long
> > >>>>> commit intervals, _and_ RTLEs, then this feature could make sense.
> If
> > >>> not,
> > >>>>> then the downsides of this feature (complication of the API,
> > >>>>> under-specification of the error coverage, etc) look unjustified.
> In
> > >>> fact,
> > >>>>> if the number of users regularly encountering RTLEs is sufficiently
> > >>> small,
> > >>>>> I would strongly advocate for an application-specific workaround
> > >>> instead of
> > >>>>> trying to fix this in Streams, or make memory buffering an optional
> > >>> feature
> > >>>>> in streams.
> > >>>>>
> > >>>>> Thanks,
> > >>>>> Greg
> > >>>>>
> > >>>>> On Mon, Jul 15, 2024 at 1:29 PM Greg Harris <greg.har...@aiven.io>
> > >>> wrote:
> > >>>>>
> > >>>>>> Hi Alieh,
> > >>>>>>
> > >>>>>> Thanks for your response.
> > >>>>>>
> > >>>>>>> what does a user do
> > >>>>>>> after a transaction is failed due to a `too-large-record
> > `exception?
> > >>>>> They
> > >>>>>>> will submit the same batch without the problematic record again.
> > >>>>>>
> > >>>>>> If they re-submit the same record, they are indicating that this
> > >> record
> > >>>>> is
> > >>>>>> an integral part of the transaction, and the transaction should
> only
> > >> be
> > >>>>>> committed with it present. If the record isn't integral to the
> > >>>>> transaction,
> > >>>>>> they shouldn't submit it as part of the transaction.
> > >>>>>>
> > >>>>>>> Regarding your solution to solve the issue application-side:  I
> am
> > a
> > >>>>>>> bit hesitant to keep all sent records in memory since I think
> > >>> buffering
> > >>>>>>> records twice (both in Streams and Producer) would not be an
> > >> efficient
> > >>>>>>> solution.
> > >>>>>>
> > >>>>>> I understand your hesitation, and this touches on the
> "performance"
> > >>>>> caveat
> > >>>>>> of the end-to-end arguments in system design. There are no perfect
> > >>>>> designs,
> > >>>>>> and some API cleanliness may be sacrificed in favor of more
> > >> performant
> > >>>>>> solutions. You would need to make a concrete and convincing
> argument
> > >>> that
> > >>>>>> the performance of this solution would be better than every
> > >>> alternative.
> > >>>>> To
> > >>>>>> that end, I would recommend that you add more to the "Rejected
> > >>>>>> Alternatives" section, as that is going to carry this proposal.
> > >>>>>> Some alternatives that I can think of, but which aren't
> necessarily
> > >>>>> better:
> > >>>>>> 1. Streams users that specifically want to drop oversize records
> can
> > >>>>>> estimate the size of their data and drop records which are too
> > >>>>>> large, enforcing their own limits that are lower than the Kafka
> > >> limits.
> > >>>>>> 2. Streams users that want CONTINUE semantics can use
> at_least_once
> > >>>>>> semantics
> > >>>>>> 3. Streams itself can store record hashes/coordinates and fast
> > rewind
> > >>> to
> > >>>>>> the end of the last transaction, recomputing data rather than
> > storing
> > >>> it.
> > >>>>>> 4. Streams can define exactly_once + CONTINUE semantics to permit
> > the
> > >>>>>> whole transaction to be dropped, because it would allow the next
> > >> batch
> > >>> to
> > >>>>>> be started processing.
> > >>>>>> 5. Streams can emit records with both a transactional and
> > >>>>>> non-transactional producer if some records are not critical-path
> > >>>>>>
> > >>>>>> To generalize this point: Suppose an application tries to minimize
> > >>>>> storage
> > >>>>>> costs by having only one party responsible for a piece of data at
> a
> > >>> time.
> > >>>>>> They initially have the data, call send(), and want to know the
> > >>> earliest
> > >>>>>> time they can forget the data and transfer the responsibility to
> > >> Kafka.
> > >>>>>> With a non-transactional producer, they are responsible for the
> data
> > >>>>> until
> > >>>>>> the send() callback has succeeded. With a transactional producer,
> > >> they
> > >>>>> are
> > >>>>>> responsible for the data until commitTransaction() has succeeded.
> > >>>>>> With this proposed change that makes the producer tolerate
> > >>>>>> too-large-exceptions, applications are still responsible for
> storing
> > >>>>> their
> > >>>>>> data until commitTransaction() has succeeded, because
> > >>> abortTransaction()
> > >>>>>> could have also been called, or the producer could have been
> fenced,
> > >> or
> > >>>>> any
> > >>>>>> number of other failures could have occurred. This feature does
> not
> > >>>>> enable
> > >>>>>> Streams to drop responsibility earlier, it carves out a specific
> > >>>>> situation
> > >>>>>> in which it doesn't have to rewind processing, which is a
> > performance
> > >>>>>> concern.
> > >>>>>>
> > >>>>>> For Streams and the general case, if an application is trying to
> > >>> optimize
> > >>>>>> storage costs, they should optimize for smaller transactions,
> > because
> > >>>>> this
> > >>>>>> both lowers the bound on record re-delivery and lowers the
> > likelihood
> > >>> of
> > >>>>> a
> > >>>>>> bad record being included in any individual transaction.
> > >>>>>>
> > >>>>>> Thanks,
> > >>>>>> Greg
> > >>>>>>
> > >>>>>> On Mon, Jul 15, 2024 at 12:35 PM Artem Livshits
> > >>>>>> <alivsh...@confluent.io.invalid> wrote:
> > >>>>>>
> > >>>>>>> Hi Greg,
> > >>>>>>>
> > >>>>>>> What you say makes a lot of sense.  I just wanted to clarify a
> > >> couple
> > >>> of
> > >>>>>>> subtle points.
> > >>>>>>>
> > >>>>>>> AL1. There is a functional reason to handle errors that happen on
> > >> send
> > >>>>>>> (oginate in the producer logic in the client) vs. errors that are
> > >>>>> returned
> > >>>>>>> from the broker.  The problem is that RecordTooLargeException is
> > >>>>> returned
> > >>>>>>> in two cases: (1) the producer logic on the client checks that
> > >> record
> > >>> is
> > >>>>>>> too large and throws the exception before doing anything with
> this
> > >> --
> > >>>>> this
> > >>>>>>> is very "clean" situation with one specific record being marked
> as
> > >>>>> "poison
> > >>>>>>> pill" and rejected; (2) the broker throws the same error if the
> > >> batch
> > >>> is
> > >>>>>>> too large -- the batch may include multiple records and none of
> > them
> > >>>>> would
> > >>>>>>> necessarily be a "poison pill" record, it's just a random
> > >>>>> misconfiguration
> > >>>>>>> of client vs. broker.  Because of this, a more "complete"
> solution
> > >>> that
> > >>>>>>> allows ignoring RecordTooLargeException regardless of its origin
> is
> > >>>>>>> actually incorrect, while a "partial" solution that allows
> ignoring
> > >>>>>>> RecordTooLargeException only originating in client code
> > accomplishes
> > >>> the
> > >>>>>>> required functionality.  This is an important nuance and should
> be
> > >>> added
> > >>>>>>> to
> > >>>>>>> the KIP.  Obviously, we could solve this problem by changing
> logic
> > >> in
> > >>>>> the
> > >>>>>>> broker to return a different error when the batch is too large,
> but
> > >>>>> right
> > >>>>>>> now this is not the case (and to have the correct error handling
> > >> we'd
> > >>>>> need
> > >>>>>>> to know the version of the broker so we can only drop the records
> > if
> > >>> the
> > >>>>>>> error is returned from a broker that knows to return a different
> > >>> error).
> > >>>>>>>
> > >>>>>>> AL2.  In a high performance system, "just an optimization" can
> be a
> > >>>>>>> functional requirement -- if a solution impacts memory or
> > >>> computational
> > >>>>>>> complexity (in the sense of bigO notation) on the main code path
> I
> > >> can
> > >>>>>>> justify changing APIs to avoid such an impact.  I'll let KStream
> > >> folks
> > >>>>>>> comment on whether an implementation that requires storing
> records
> > >> in
> > >>>>>>> memory actually violates the computational complexity on the main
> > >> code
> > >>>>>>> path, I just wanted to make the point that we shouldn't
> necessarily
> > >>>>>>> dismiss
> > >>>>>>> API changes that allow for optimizations.
> > >>>>>>>
> > >>>>>>> -Artem
> > >>>>>>>
> > >>>>>>> On Fri, Jul 12, 2024 at 1:07 PM Greg Harris
> > >>>>> <greg.har...@aiven.io.invalid
> > >>>>>>>>
> > >>>>>>> wrote:
> > >>>>>>>
> > >>>>>>>> Hi all,
> > >>>>>>>>
> > >>>>>>>> Alieh, thanks for the KIP! And everyone else, thanks for the
> > robust
> > >>>>>>>> discussion.
> > >>>>>>>>
> > >>>>>>>> I understand that there are situations in which users desire
> that
> > >> the
> > >>>>>>>> pipeline "just keep working" and skip errors. However, I
> question
> > >>>>>>> whether
> > >>>>>>>> it is appropriate to support/encourage this behavior via
> inclusion
> > >> in
> > >>>>>>> the
> > >>>>>>>> Producer API.
> > >>>>>>>> This feature is essentially a "non-atomic transaction", as it
> > >> allows
> > >>>>>>>> commits in which not all records passed to send() ultimately get
> > >>>>>>> committed.
> > >>>>>>>> As atomicity is one of the most important semantics associated
> > with
> > >>>>>>>> transactions, I question whether there are users other than
> > Streams
> > >>>>> that
> > >>>>>>>> would choose non-atomic transactions over a
> traditional/idempotent
> > >>>>>>>> producer.
> > >>>>>>>> Some cursory research shows that non-atomic transactions may be
> > >>>>> present
> > >>>>>>> in
> > >>>>>>>> other databases, but is actively discouraged due to the
> complexity
> > >>>>> they
> > >>>>>>> add
> > >>>>>>>> to error-handling. [1]
> > >>>>>>>>
> > >>>>>>>> I'd like to invoke the End-to-End Arguments in System Design [2]
> > >>> here,
> > >>>>>>> and
> > >>>>>>>> recommend that this behavior may be present in Streams, but
> should
> > >>> not
> > >>>>>>> be
> > >>>>>>>> in the Producer.
> > >>>>>>>> 1. Dropping records that cause errors is already expressible via
> > >> the
> > >>>>>>>> current Producer API. You can store the records in-memory after
> > >>>>> calling
> > >>>>>>>> send(), wait for a successful no-error flush() before calling
> > >>>>>>>> commitTransaction() and allowing the record to be garbage
> > >> collected.
> > >>>>> If
> > >>>>>>>> errors occur, abortTransaction() and re-submit the records.
> > >>>>>>>> 2. Implementing this inside the Producer API is complex and
> > >> difficult
> > >>>>> to
> > >>>>>>>> holistically define in a way that we won't regret or need to
> > change
> > >>>>>>> later.
> > >>>>>>>> I think some of the disagreement in this thread originates from
> > >> this,
> > >>>>>>> and I
> > >>>>>>>> don't find the proposed API satisfactory.
> > >>>>>>>> 3. The performance improvement of including this change in the
> > >> lower
> > >>>>>>> level
> > >>>>>>>> needs to be quantified in order to be a justification, and I
> don't
> > >>> see
> > >>>>>>> any
> > >>>>>>>> analysis about this.
> > >>>>>>>>
> > >>>>>>>> I imagine that the alternative implementation I suggested in (1)
> > >>> would
> > >>>>>>> also
> > >>>>>>>> enable more expressive error handlers in Streams, if such a
> thing
> > >> was
> > >>>>>>>> desired. Keeping the record around until after the transaction
> is
> > >>>>>>> committed
> > >>>>>>>> would enable a DLQ or passing the erroneous record to the error
> > >>>>> handler.
> > >>>>>>>>
> > >>>>>>>> I think that the current pattern of the application being
> > >> responsible
> > >>>>>>> for
> > >>>>>>>> providing good data to the producer is very reasonable; Having
> the
> > >>>>>>> producer
> > >>>>>>>> responsible for implementing the application's error handling of
> > >> bad
> > >>>>>>> data
> > >>>>>>>> is not something I can support.
> > >>>>>>>>
> > >>>>>>>> Thanks,
> > >>>>>>>> Greg
> > >>>>>>>>
> > >>>>>>>> [1] https://www.sommarskog.se/error_handling/Part1.html
> > >>>>>>>> [2]
> > >>>>> https://web.mit.edu/Saltzer/www/publications/endtoend/endtoend.pdf
> > >>>>>>>>
> > >>>>>>>> On Fri, Jul 12, 2024 at 8:52 AM Justine Olshan
> > >>>>>>>> <jols...@confluent.io.invalid>
> > >>>>>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> Can we update the KIP to clearly document these decisions?
> > >>>>>>>>>
> > >>>>>>>>> Thanks,
> > >>>>>>>>>
> > >>>>>>>>> Justine
> > >>>>>>>>>
> > >>>>>>>>> On Tue, Jul 9, 2024 at 9:25 AM Andrew Schofield <
> > >>>>>>>> andrew_schofi...@live.com
> > >>>>>>>>>>
> > >>>>>>>>> wrote:
> > >>>>>>>>>
> > >>>>>>>>>> Hi Chris,
> > >>>>>>>>>> As it stands, the error handling for transactions in
> > >> KafkaProducer
> > >>>>>>> is
> > >>>>>>>> not
> > >>>>>>>>>> ideal. There’s no reason why a failed operation should fail a
> > >>>>>>>> transaction
> > >>>>>>>>>> provided that the application can tell that the operation was
> > not
> > >>>>>>>>> included
> > >>>>>>>>>> in the transaction and then make its own decision whether to
> > >>>>>>> continue
> > >>>>>>>> or
> > >>>>>>>>>> back out. So, I think I disagree with the original premise of
> a
> > >>>>>>>>> client-side
> > >>>>>>>>>> error state for a transaction, but we are where we are.
> > >>>>>>>>>>
> > >>>>>>>>>> When I voted, I did not expect the KIP to handle ALL errors
> > which
> > >>>>>>> could
> > >>>>>>>>>> conceivably be handled. I did expect it to handle client-side
> > >> send
> > >>>>>>>> errors
> > >>>>>>>>>> that would cause a record to be rejected from a batch before
> > >>>>> sending
> > >>>>>>>> to a
> > >>>>>>>>>> broker. I think that it does make the KafkaProducer interface
> > >> very
> > >>>>>>>>> slightly
> > >>>>>>>>>> more complicated, but the new option is a clear improvement
> and
> > I
> > >>>>>>>>>> don’t see anyone getting into a mess using it.
> > >>>>>>>>>>
> > >>>>>>>>>> I think broker-side errors are more tricky and I don’t think
> an
> > >>>>>>>> overload
> > >>>>>>>>>> on the send() method is going to do the job. I don’t see that
> as
> > >> a
> > >>>>>>>>> problem
> > >>>>>>>>>> with the KIP, just that the underlying RPCs and behaviour is
> not
> > >>>>>>> very
> > >>>>>>>>>> amenable to record-specific error handling. The Produce RPC
> is a
> > >>>>>>>>>> complicated beast which can include a set of records for
> mutiple
> > >>>>>>>>>> topic-partitions. Although ProduceResponse v10 does include
> > >> record
> > >>>>>>>>>> errors, I don’t believe this is surfaced in the client. Let’s
> > >>>>>>> imagine
> > >>>>>>>>>> something
> > >>>>>>>>>> like broker-side record validation which barfs on one record.
> > >>>>>>> Failing
> > >>>>>>>> an
> > >>>>>>>>>> entire batch is easier, but less useful if the problem is
> > related
> > >>>>> to
> > >>>>>>>> one
> > >>>>>>>>>> record.
> > >>>>>>>>>>
> > >>>>>>>>>> In summary, I’m happy that my vote stands, and I am happy with
> > >> the
> > >>>>>>> KIP
> > >>>>>>>>>> only supporting client-side record errors.
> > >>>>>>>>>>
> > >>>>>>>>>> Thanks,
> > >>>>>>>>>> Andrew
> > >>>>>>>>>>
> > >>>>>>>>>>> On 8 Jul 2024, at 16:37, Chris Egerton
> <chr...@aiven.io.INVALID
> > >>>>>>
> > >>>>>>>>> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>> Hi Alieh,
> > >>>>>>>>>>>
> > >>>>>>>>>>> Can you clarify why broker-side errors shouldn't be covered?
> > The
> > >>>>>>> only
> > >>>>>>>>>> real
> > >>>>>>>>>>> rationale I can come up with is that it's easier to
> implement.
> > >>>>>>>>>>>
> > >>>>>>>>>>> "Things were better for Kafka Streams before KAFKA-9279 was
> > >>>>> fixed"
> > >>>>>>>>> isn't
> > >>>>>>>>>>> very convincing, because Kafka Streams is not the only user
> of
> > >>>>> the
> > >>>>>>>> Java
> > >>>>>>>>>>> producer client. And for others, especially new users, I
> doubt
> > >>>>>>> that
> > >>>>>>>>> this
> > >>>>>>>>>>> new API we're proposing would make sense without having to
> > >>>>>>> consult a
> > >>>>>>>>> lot
> > >>>>>>>>>> of
> > >>>>>>>>>>> historical context.
> > >>>>>>>>>>>
> > >>>>>>>>>>> I also don't think that most users will know or even care
> about
> > >>>>>>> the
> > >>>>>>>>>>> distinction between errors that cause a record to fail before
> > >>>>> it's
> > >>>>>>>>> added
> > >>>>>>>>>> to
> > >>>>>>>>>>> a batch vs. after. If you were writing a producer application
> > of
> > >>>>>>> your
> > >>>>>>>>>> own,
> > >>>>>>>>>>> and you wanted to handle RecordTooLargeException instances by
> > >>>>>>>> dropping
> > >>>>>>>>> a
> > >>>>>>>>>>> record without aborting a transaction, would you care about
> > >>>>>>> whether
> > >>>>>>>> it
> > >>>>>>>>>> was
> > >>>>>>>>>>> your client or your broker that balked? Would you be happy if
> > >>>>> you
> > >>>>>>>> wrote
> > >>>>>>>>>>> logic expecting that that problem was solved once and for
> all,
> > >>>>>>> only
> > >>>>>>>> to
> > >>>>>>>>>>> learn that it could still affect you in other circumstances?
> > Or,
> > >>>>>>>>>>> alternatively, would you be happy if you wanted to solve that
> > >>>>>>> problem
> > >>>>>>>>> and
> > >>>>>>>>>>> found an API that seemed to do exactly what you wanted, but
> > >>>>> after
> > >>>>>>>>> reading
> > >>>>>>>>>>> the fine print, realized you'd have to do it yourself
> instead?
> > >>>>>>>>>>>
> > >>>>>>>>>>> Ultimately, the more I think about this, the more I believe
> > that
> > >>>>>>>> we're
> > >>>>>>>>>>> adding noise to the API (with the new overloaded variant of
> > >>>>> send)
> > >>>>>>>> for a
> > >>>>>>>>>>> feature that will likely bring confusion and even frustration
> > to
> > >>>>>>>> anyone
> > >>>>>>>>>>> besides maintainers of Kafka Streams who tries to use it.
> > >>>>>>>>>>>
> > >>>>>>>>>>> If the only concern about covering broker-side errors is that
> > it
> > >>>>>>>> would
> > >>>>>>>>> be
> > >>>>>>>>>>> more difficult to implement, I believe we should strongly
> > >>>>>>> reconsider
> > >>>>>>>>> that
> > >>>>>>>>>>> alternative. That said, if there is a straightforward way to
> > >>>>>>> explain
> > >>>>>>>>> this
> > >>>>>>>>>>> feature to new users that won't mislead them or require them
> to
> > >>>>> do
> > >>>>>>>>>> research
> > >>>>>>>>>>> on producer internals, then I can still live with it.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Regarding a list of recoverable vs. irrecoverable errors,
> this
> > >>>>> is
> > >>>>>>>>>> actually
> > >>>>>>>>>>> the subject of another recently-introduced KIP:
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>
> > >>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1050%3A+Consistent+error+handling+for+Transactions
> > >>>>>>>>>>>
> > >>>>>>>>>>> Finally, I'd also like to ask the people who have already
> voted
> > >>>>>>>>> (Andrew,
> > >>>>>>>>>>> Matthias) if, at the time they voted, they believed that the
> > API
> > >>>>>>>> would
> > >>>>>>>>>>> handle all errors, or only the subset of errors that would
> > >>>>> cause a
> > >>>>>>>>> record
> > >>>>>>>>>>> to be rejected from a batch before it can be sent to a
> broker.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Best,
> > >>>>>>>>>>>
> > >>>>>>>>>>> Chris
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Thu, Jul 4, 2024 at 12:43 PM Alieh Saeedi
> > >>>>>>>>>> <asae...@confluent.io.invalid>
> > >>>>>>>>>>> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>>> Salut from the KIP’s author
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Clarifying two points:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> 1) broker side errors:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> As far as I remember we are not going to cover the errors
> > >>>>>>>> originating
> > >>>>>>>>>> from
> > >>>>>>>>>>>> the broker!
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> A historical fact: One of the debate points in KIP-1038 was
> > >>>>> that
> > >>>>>>> by
> > >>>>>>>>>>>> defining a producer custom handler, the user may assume that
> > >>>>>>>>> broker-side
> > >>>>>>>>>>>> errors must be covered as well. They may define a handler
> for
> > >>>>>>>> handling
> > >>>>>>>>>>>> `RecordTooLargeException` and still see such errors not
> being
> > >>>>>>>> handled
> > >>>>>>>>> as
> > >>>>>>>>>>>> they wish.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> 2) Regarding irrecoverable/recoverable errors:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Before the fix of `KAFKA-9279`,  errors such as
> > >>>>>>>>>> `RecordTooLargeException`
> > >>>>>>>>>>>> or errors related to missing meta data (both originating
> from
> > >>>>>>>> Producer
> > >>>>>>>>>>>> `send()`) were considered as recoverable but after that they
> > >>>>>>> turned
> > >>>>>>>>> into
> > >>>>>>>>>>>> being irrecoverable without changing any Javadocs or having
> > any
> > >>>>>>> KIP.
> > >>>>>>>>>> All
> > >>>>>>>>>>>> the effort made in this KIP and the former one have been
> > >>>>> towards
> > >>>>>>>>>> returning
> > >>>>>>>>>>>> to the former state.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> I am sure that it is clear for you that which sort of errors
> > we
> > >>>>>>> are
> > >>>>>>>>>> going
> > >>>>>>>>>>>> to cover: A single record may happen to NOT get added to the
> > >>>>>>> batch
> > >>>>>>>> due
> > >>>>>>>>>> to
> > >>>>>>>>>>>> the issues with the record or its corresponding topic. The
> > >>>>> point
> > >>>>>>> was
> > >>>>>>>>>> that
> > >>>>>>>>>>>> if the record is not added to the batch let ’s don’t fail
> the
> > >>>>>>> whole
> > >>>>>>>>>> batch
> > >>>>>>>>>>>> because of that non-existing record. We never intended to do
> > >>>>> sth
> > >>>>>>> in
> > >>>>>>>>>> broker
> > >>>>>>>>>>>> side or ignore more important errors.  But I agree with you
> > >>>>>>> Chris.
> > >>>>>>>> If
> > >>>>>>>>> we
> > >>>>>>>>>>>> are adding a new API, we must have good documentation for
> > that.
> > >>>>>>> The
> > >>>>>>>>>>>> sentence `all irrecoverable transactional errors will still
> be
> > >>>>>>>> fatal`
> > >>>>>>>>> as
> > >>>>>>>>>>>> you suggested is good. What do you think? I am totally
> against
> > >>>>>>>>>> enumerating
> > >>>>>>>>>>>> errors in Javadocs since these sort of errors can be
> changing
> > >>>>>>> during
> > >>>>>>>>>>>> time.  More
> > >>>>>>>>>>>> over, have you ever seen any list of recoverable or
> > >>>>> irrecoverable
> > >>>>>>>>> errors
> > >>>>>>>>>>>> somewhere so far?
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Bests,
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Alieh
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> On Wed, Jul 3, 2024 at 6:07 PM Chris Egerton
> > >>>>>>>> <chr...@aiven.io.invalid
> > >>>>>>>>>>
> > >>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> Hi Justine,
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> I agree that enumerating a list of errors that should be
> > >>>>>>> covered by
> > >>>>>>>>> the
> > >>>>>>>>>>>> KIP
> > >>>>>>>>>>>>> is difficult; I was thinking it might be easier if we list
> > the
> > >>>>>>>> errors
> > >>>>>>>>>>>> that
> > >>>>>>>>>>>>> should _not_ be covered by the KIP, and only if we can't
> > >>>>> define
> > >>>>>>> a
> > >>>>>>>>>>>>> reasonable heuristic that would cover them without having
> to
> > >>>>>>>>> explicitly
> > >>>>>>>>>>>>> list them. Could it be enough to say "all irrecoverable
> > >>>>>>>> transactional
> > >>>>>>>>>>>>> errors will still be fatal", or even just "all
> transactional
> > >>>>>>> errors
> > >>>>>>>>> (as
> > >>>>>>>>>>>>> opposed to errors related to this specific record) will
> still
> > >>>>> be
> > >>>>>>>>>> fatal"?
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Chris
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> On Wed, Jul 3, 2024 at 11:56 AM Justine Olshan
> > >>>>>>>>>>>>> <jols...@confluent.io.invalid>
> > >>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Hey Chris,
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> I think what you say makes sense. I agree that defining
> the
> > >>>>>>>> behavior
> > >>>>>>>>>>>>> based
> > >>>>>>>>>>>>>> on code that can possibly change is not a good idea, and I
> > >>>>> was
> > >>>>>>>>> trying
> > >>>>>>>>>>>> to
> > >>>>>>>>>>>>>> get a clearer definition from the KIP's author :)
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> I think it can always be hard to ensure that only specific
> > >>>>>>> errors
> > >>>>>>>>> are
> > >>>>>>>>>>>>>> handled unless they are explicitly enumerated in code as
> the
> > >>>>>>> code
> > >>>>>>>>> can
> > >>>>>>>>>>>>>> change and can be changed by folks who are not aware of
> this
> > >>>>>>> KIP
> > >>>>>>>> or
> > >>>>>>>>>>>>>> conversation.
> > >>>>>>>>>>>>>> I personally don't have the bandwidth to do this
> > >>>>>>>>>> definition/enumeration
> > >>>>>>>>>>>>> of
> > >>>>>>>>>>>>>> errors, so hopefully Alieh can expand upon this.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Justine
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> On Wed, Jul 3, 2024 at 8:28 AM Chris Egerton
> > >>>>>>>>> <chr...@aiven.io.invalid
> > >>>>>>>>>>>
> > >>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Hi Alieh,
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> I don't love defining the changes for this KIP in terms
> of
> > a
> > >>>>>>>> catch
> > >>>>>>>>>>>>> clause
> > >>>>>>>>>>>>>>> in the KafkaProducer class, for two reasons. First, the
> set
> > >>>>> of
> > >>>>>>>>> errors
> > >>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>> are handled by that clause may shift over time as the
> code
> > >>>>>>> base
> > >>>>>>>> is
> > >>>>>>>>>>>>>>> modified, and second, it would be fairly opaque to users
> > who
> > >>>>>>> want
> > >>>>>>>>> to
> > >>>>>>>>>>>>>>> understand whether an error would be affected by using
> this
> > >>>>>>> API
> > >>>>>>>> or
> > >>>>>>>>>>>> not.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> It also seems strange that we'd handle some types of
> > >>>>>>>>>>>>>>> RecordTooLargeException (i.e., ones reported client-side)
> > >>>>> with
> > >>>>>>>> this
> > >>>>>>>>>>>>> API,
> > >>>>>>>>>>>>>>> but not others (i.e., ones reported by a broker).
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> I think this kind of API would be most powerful, most
> > >>>>>>> intuitive
> > >>>>>>>> to
> > >>>>>>>>>>>>> users,
> > >>>>>>>>>>>>>>> and easiest to document if we expanded the scope to all
> > >>>>>>>>>>>>>> record-send-related
> > >>>>>>>>>>>>>>> errors, except anything indicating issues with
> exactly-once
> > >>>>>>>>>>>> semantics.
> > >>>>>>>>>>>>>> That
> > >>>>>>>>>>>>>>> would include records that are too large (when caught
> both
> > >>>>>>>> client-
> > >>>>>>>>>>>> and
> > >>>>>>>>>>>>>>> server-side), records that can't be sent due to
> > >>>>> authorization
> > >>>>>>>>>>>> failures,
> > >>>>>>>>>>>>>>> records sent to nonexistent topics/topic partitions, and
> > >>>>>>> keyless
> > >>>>>>>>>>>>> records
> > >>>>>>>>>>>>>>> sent to compacted topics. It would not include
> > >>>>>>>>>>>>>>> ProducerFencedException, InvalidProducerEpochException,
> > >>>>>>>>>>>>>>> UnsupportedVersionException,
> > >>>>>>>>>>>>>>> and possibly others.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> @Justine -- do you think it would be possible to develop
> > >>>>>>> either a
> > >>>>>>>>>>>>> better
> > >>>>>>>>>>>>>>> definition for the kinds of "excluded" errors that should
> > >>>>> not
> > >>>>>>> be
> > >>>>>>>>>>>>> covered
> > >>>>>>>>>>>>>> by
> > >>>>>>>>>>>>>>> this API, or, barring that, a comprehensive list of exact
> > >>>>>>> error
> > >>>>>>>>>>>> types?
> > >>>>>>>>>>>>>> And
> > >>>>>>>>>>>>>>> do you think this would be acceptable in terms of risk
> and
> > >>>>>>>>>>>> complexity?
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Chris
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> On Tue, Jul 2, 2024 at 5:05 PM Alieh Saeedi
> > >>>>>>>>>>>>> <asae...@confluent.io.invalid
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Hey Justine,
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> About the consequences: the consequences will be like
> when
> > >>>>> we
> > >>>>>>>> did
> > >>>>>>>>>>>> not
> > >>>>>>>>>>>>>>> have
> > >>>>>>>>>>>>>>>> the fix made in `KAFKA-9279`: silent loss of data!
> > >>>>> Obviously,
> > >>>>>>>> when
> > >>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>> user
> > >>>>>>>>>>>>>>>> intentionally chose to ignore errors, that would not be
> > >>>>>>> silent
> > >>>>>>>> any
> > >>>>>>>>>>>>>> more.
> > >>>>>>>>>>>>>>>> Right?
> > >>>>>>>>>>>>>>>> Of course, considering all types of `ApiException`s
> would
> > >>>>> be
> > >>>>>>> too
> > >>>>>>>>>>>>> broad.
> > >>>>>>>>>>>>>>> But
> > >>>>>>>>>>>>>>>> are the exceptions caught in `catch(ApiException e)` of
> > the
> > >>>>>>>>>>>>> `doSend()`
> > >>>>>>>>>>>>>>>> method also too broad?
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> -Alieh
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> On Tue, Jul 2, 2024 at 9:45 PM Justine Olshan
> > >>>>>>>>>>>>>>> <jols...@confluent.io.invalid
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Hey Alieh,
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> If we want to allow any error to be ignored we should
> > >>>>>>> probably
> > >>>>>>>>>>>> run
> > >>>>>>>>>>>>>>>> through
> > >>>>>>>>>>>>>>>>> all the errors to make sure they make sense.
> > >>>>>>>>>>>>>>>>> I just want to feel confident that we aren't just
> making
> > a
> > >>>>>>>>>>>> decision
> > >>>>>>>>>>>>>>>> without
> > >>>>>>>>>>>>>>>>> considering the consequences carefully.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Justine
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> On Tue, Jul 2, 2024 at 12:30 PM Alieh Saeedi
> > >>>>>>>>>>>>>>>> <asae...@confluent.io.invalid
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Hey Justine,
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> yes we talked about `RecordTooLargeException` as an
> > >>>>>>> example,
> > >>>>>>>>>>>> but
> > >>>>>>>>>>>>>> did
> > >>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>> ever limit ourselves to only this specific exception?
> I
> > >>>>>>> think
> > >>>>>>>>>>>>>> neither
> > >>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>> the KIP nor in the PR.  As Chris mentioned, this KIP
> is
> > >>>>>>> going
> > >>>>>>>>>>>> to
> > >>>>>>>>>>>>>> undo
> > >>>>>>>>>>>>>>>>> what
> > >>>>>>>>>>>>>>>>>> we have done in `KAFKA-9279` in case 1) the user is
> in a
> > >>>>>>>>>>>>>> transaction
> > >>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>> 2)
> > >>>>>>>>>>>>>>>>>> he decides to ignore the errors in which the record
> was
> > >>>>> not
> > >>>>>>>>>>>> even
> > >>>>>>>>>>>>>>> added
> > >>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>> the batch. Yes, and we suggested some methods for
> > undoing
> > >>>>>>> or,
> > >>>>>>>>>>>> in
> > >>>>>>>>>>>>>>> fact,
> > >>>>>>>>>>>>>>>>>> moving back the transaction from the error state in
> > >>>>>>> `flush` or
> > >>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>> `commitTnx` and we finally came to the idea of not
> even
> > >>>>>>> doing
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>> changes
> > >>>>>>>>>>>>>>>>>> (better than undoing) in `send`.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Bests,
> > >>>>>>>>>>>>>>>>>> Alieh
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> On Tue, Jul 2, 2024 at 8:03 PM Justine Olshan
> > >>>>>>>>>>>>>>>>> <jols...@confluent.io.invalid
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Hey folks,
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> I understand where you are coming from by asking for
> > >>>>>>> specific
> > >>>>>>>>>>>>> use
> > >>>>>>>>>>>>>>>>> cases.
> > >>>>>>>>>>>>>>>>>> My
> > >>>>>>>>>>>>>>>>>>> understanding based on previous conversations was
> that
> > >>>>>>> there
> > >>>>>>>>>>>>>> were a
> > >>>>>>>>>>>>>>>> few
> > >>>>>>>>>>>>>>>>>>> different errors that have been seen.
> > >>>>>>>>>>>>>>>>>>> One example I heard some information about was when
> the
> > >>>>>>>>>>>> record
> > >>>>>>>>>>>>>> was
> > >>>>>>>>>>>>>>>> too
> > >>>>>>>>>>>>>>>>>>> large and it fails the batch. Besides that, I'm not
> > >>>>> really
> > >>>>>>>>>>>> sure
> > >>>>>>>>>>>>>> if
> > >>>>>>>>>>>>>>>>> there
> > >>>>>>>>>>>>>>>>>>> are cases in mind, though it is fair to ask on those
> > and
> > >>>>>>>>>>>> bring
> > >>>>>>>>>>>>>> them
> > >>>>>>>>>>>>>>>> up.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Does a record qualify as a poison pill if it
> targets a
> > >>>>>>>>>>>> topic
> > >>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>> doesn't exist? Or if it targets a topic that the
> > >>>>> producer
> > >>>>>>>>>>>>>> principal
> > >>>>>>>>>>>>>>>>> lacks
> > >>>>>>>>>>>>>>>>>>> ACLs for? What if it fails broker-side validation
> > (e.g.,
> > >>>>>>> has
> > >>>>>>>>>>>> a
> > >>>>>>>>>>>>>> null
> > >>>>>>>>>>>>>>>> key
> > >>>>>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>> a compacted topic)?
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> I think there was some parallel work with addressing
> > the
> > >>>>>>>>>>>>>>>>>>> UnknownTopicOrPartitionError in another way. As for
> the
> > >>>>>>> other
> > >>>>>>>>>>>>>>> checks,
> > >>>>>>>>>>>>>>>>>> acls,
> > >>>>>>>>>>>>>>>>>>> validation etc. I am not aware of that being in
> Alieh's
> > >>>>>>>>>>>> scope,
> > >>>>>>>>>>>>>> but
> > >>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>> should be clear about exactly what we are doing.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> All errors that fall into ApiException seems too
> broad
> > >>>>> to
> > >>>>>>> me.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Justine
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> On Tue, Jul 2, 2024 at 10:51 AM Alieh Saeedi
> > >>>>>>>>>>>>>>>>>> <asae...@confluent.io.invalid
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Hey Chris,
> > >>>>>>>>>>>>>>>>>>>> thanks for sharing your concerns.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> 1) About the language of KIP (or maybe later in
> > >>>>>>> Javadocs):
> > >>>>>>>>>>>> Is
> > >>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>> alright
> > >>>>>>>>>>>>>>>>>>>> if I write all errors that fall into the
> > `ApiException`
> > >>>>>>>>>>>>>> category
> > >>>>>>>>>>>>>>>>> thrown
> > >>>>>>>>>>>>>>>>>>>> (actually returned) by Producer?
> > >>>>>>>>>>>>>>>>>>>> 2) About future expansion: do you have any better
> > >>>>>>>>>>>> suggestions
> > >>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>> enum
> > >>>>>>>>>>>>>>>>>>>> names? Do you think `IGNORE_API_EXEPTIONS` or
> > something
> > >>>>>>>>>>>> like
> > >>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>>> "better/more accurate" one?
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Bests,
> > >>>>>>>>>>>>>>>>>>>> Alieh
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> On Tue, Jul 2, 2024 at 7:29 PM Chris Egerton
> > >>>>>>>>>>>>>>>> <chr...@aiven.io.invalid
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Hi Alieh and Justine,
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> I'm concerned that we're settling on a definition
> of
> > >>>>>>>>>>>>> "poison
> > >>>>>>>>>>>>>>>> pill"
> > >>>>>>>>>>>>>>>>>>> that's
> > >>>>>>>>>>>>>>>>>>>>> easiest to tackle right now but may lead to
> > >>>>> shortcomings
> > >>>>>>>>>>>>> down
> > >>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>> road. I
> > >>>>>>>>>>>>>>>>>>>>> understand the relationship between this KIP and
> > >>>>>>>>>>>>> KAFKA-9279,
> > >>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>> I
> > >>>>>>>>>>>>>>>>>> can
> > >>>>>>>>>>>>>>>>>>>>> totally get behind the desire to keep things small,
> > >>>>>>>>>>>>> focused,
> > >>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>> simple
> > >>>>>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>> the name of avoiding bugs. However, what I don't
> > think
> > >>>>>>> is
> > >>>>>>>>>>>>>> clear
> > >>>>>>>>>>>>>>>> at
> > >>>>>>>>>>>>>>>>>> all
> > >>>>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>> what the "specific circumstances" are that Justine
> > >>>>>>>>>>>>>> mentioned. I
> > >>>>>>>>>>>>>>>>> had a
> > >>>>>>>>>>>>>>>>>>>>> drastically different idea of what the intended
> > >>>>>>>>>>>> behavioral
> > >>>>>>>>>>>>>>> change
> > >>>>>>>>>>>>>>>>>> would
> > >>>>>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>> before looking at the draft PR.
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> I would like 1) for us to be clearer about the
> > >>>>>>> categories
> > >>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>> errors
> > >>>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>>> want to cover with this new API (especially since
> > >>>>> we'll
> > >>>>>>>>>>>>> have
> > >>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>> find
> > >>>>>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>>>> clear, succinct way to document this for users),
> and
> > >>>>> 2)
> > >>>>>>>>>>>> to
> > >>>>>>>>>>>>>> make
> > >>>>>>>>>>>>>>>>> sure
> > >>>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>> if we do try to expand this API in the future, that
> > we
> > >>>>>>>>>>>>> won't
> > >>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>> painted
> > >>>>>>>>>>>>>>>>>>>>> into a corner.
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> For item 1, hopefully we can agree that the
> language
> > >>>>> in
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>> KIP
> > >>>>>>>>>>>>>>>>>>>>> for IGNORE_SEND_ERRORS ("The records causing
> > >>>>>>>>>>>> irrecoverable
> > >>>>>>>>>>>>>>> errors
> > >>>>>>>>>>>>>>>>> are
> > >>>>>>>>>>>>>>>>>>>>> excluded from the batch and the transaction is
> > >>>>> committed
> > >>>>>>>>>>>>>>>>>>> successfully.")
> > >>>>>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>> pretty vague. If we start using the phrase "poison
> > >>>>> pill
> > >>>>>>>>>>>>>> record"
> > >>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>> could
> > >>>>>>>>>>>>>>>>>>>>> help, but IMO more detail would still be needed. We
> > >>>>> know
> > >>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>> want
> > >>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>> include records that are so large that they can be
> > >>>>>>>>>>>>>> immediately
> > >>>>>>>>>>>>>>>>>> rejected
> > >>>>>>>>>>>>>>>>>>>> by
> > >>>>>>>>>>>>>>>>>>>>> the producer. But there are other cases that users
> > >>>>> might
> > >>>>>>>>>>>>>> expect
> > >>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>> handled. Does a record qualify as a poison pill if
> it
> > >>>>>>>>>>>>>> targets a
> > >>>>>>>>>>>>>>>>> topic
> > >>>>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>> doesn't exist? Or if it targets a topic that the
> > >>>>>>> producer
> > >>>>>>>>>>>>>>>> principal
> > >>>>>>>>>>>>>>>>>>> lacks
> > >>>>>>>>>>>>>>>>>>>>> ACLs for? What if it fails broker-side validation
> > >>>>> (e.g.,
> > >>>>>>>>>>>>> has
> > >>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>> null
> > >>>>>>>>>>>>>>>>>> key
> > >>>>>>>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>>>> a compacted topic)?
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> For item 2, this really depends on how narrow the
> > >>>>> scope
> > >>>>>>>>>>>> of
> > >>>>>>>>>>>>>> what
> > >>>>>>>>>>>>>>>>> we're
> > >>>>>>>>>>>>>>>>>>>> doing
> > >>>>>>>>>>>>>>>>>>>>> right now is. If we only handle a subset of the
> > >>>>> examples
> > >>>>>>>>>>>> I
> > >>>>>>>>>>>>>> laid
> > >>>>>>>>>>>>>>>> out
> > >>>>>>>>>>>>>>>>>>> above
> > >>>>>>>>>>>>>>>>>>>>> that could possibly be considered poison pills with
> > >>>>> this
> > >>>>>>>>>>>>> KIP,
> > >>>>>>>>>>>>>>> do
> > >>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>> want
> > >>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>> lock ourselves in to never addressing more in the
> > >>>>>>> future,
> > >>>>>>>>>>>>> or
> > >>>>>>>>>>>>>>> can
> > >>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>> choose
> > >>>>>>>>>>>>>>>>>>>>> an API (probably just enum names would be the only
> > >>>>>>>>>>>>> important
> > >>>>>>>>>>>>>>>>> decision
> > >>>>>>>>>>>>>>>>>>>> here)
> > >>>>>>>>>>>>>>>>>>>>> that leaves room for more later?
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Chris
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> On Tue, Jul 2, 2024 at 12:28 PM Justine Olshan
> > >>>>>>>>>>>>>>>>>>>>> <jols...@confluent.io.invalid>
> > >>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> Chris and Alieh,
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> My understanding is that this KIP is really only
> > >>>>> trying
> > >>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>> solve
> > >>>>>>>>>>>>>>>>> an
> > >>>>>>>>>>>>>>>>>>>> issue
> > >>>>>>>>>>>>>>>>>>>>>> of a "poison pill" record that fails send().
> > >>>>>>>>>>>>>>>>>>>>>> We've talked a lot about having a generic
> framework
> > >>>>> for
> > >>>>>>>>>>>>> all
> > >>>>>>>>>>>>>>>>> errors,
> > >>>>>>>>>>>>>>>>>>>> but I
> > >>>>>>>>>>>>>>>>>>>>>> don't think that is what this KIP is trying to do.
> > >>>>>>>>>>>>>>> Essentially
> > >>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>> request
> > >>>>>>>>>>>>>>>>>>>>>> is to undo the change from KAFKA-9279
> > >>>>>>>>>>>>>>>>>>>>>> <https://issues.apache.org/jira/browse/KAFKA-9279
> >
> > >>>>> but
> > >>>>>>>>>>>>>> under
> > >>>>>>>>>>>>>>>>>>> specific
> > >>>>>>>>>>>>>>>>>>>>>> circumstances that are controlled. I really am
> > >>>>>>>>>>>> concerned
> > >>>>>>>>>>>>>>> about
> > >>>>>>>>>>>>>>>>>>> opening
> > >>>>>>>>>>>>>>>>>>>>> new
> > >>>>>>>>>>>>>>>>>>>>>> avenues for bugs with EOS and hesitate to handle
> any
> > >>>>>>>>>>>>> other
> > >>>>>>>>>>>>>>>> types
> > >>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>> errors.
> > >>>>>>>>>>>>>>>>>>>>>> I think if we all agree on the problem that we are
> > >>>>>>>>>>>> trying
> > >>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>> solve,
> > >>>>>>>>>>>>>>>>>>> it
> > >>>>>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>>> easier to agree on solutions.
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> Justine
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> On Mon, Jul 1, 2024 at 2:20 AM Alieh Saeedi
> > >>>>>>>>>>>>>>>>>>>> <asae...@confluent.io.invalid
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> Hi Matthias,
> > >>>>>>>>>>>>>>>>>>>>>>> Thanks for the valid points you mentioned. I
> > updated
> > >>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>> KIP
> > >>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>> PR
> > >>>>>>>>>>>>>>>>>>>>>>> with:
> > >>>>>>>>>>>>>>>>>>>>>>> 1) mentioning that the new overloaded `send`
> throws
> > >>>>>>>>>>>>>>>>>>>>>> `IllegalStateException`
> > >>>>>>>>>>>>>>>>>>>>>>> if the user tries to ignore `send()` errors
> outside
> > >>>>>>>>>>>> of
> > >>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>> transaction.
> > >>>>>>>>>>>>>>>>>>>>>>> 2) the default implementation in `Producer`
> > >>>>> interface
> > >>>>>>>>>>>>>>> throws
> > >>>>>>>>>>>>>>>> an
> > >>>>>>>>>>>>>>>>>>>>>>> `UnsupportedOperationException`
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> Hi Chris,
> > >>>>>>>>>>>>>>>>>>>>>>> Thanks for the feedback. I tried to clarify the
> > >>>>>>>>>>>> points
> > >>>>>>>>>>>>>> you
> > >>>>>>>>>>>>>>>>>> listed:
> > >>>>>>>>>>>>>>>>>>>>>>> -------> we've narrowed the scope from any error
> > >>>>> that
> > >>>>>>>>>>>>>> might
> > >>>>>>>>>>>>>>>>> take
> > >>>>>>>>>>>>>>>>>>>> place
> > >>>>>>>>>>>>>>>>>>>>>> with
> > >>>>>>>>>>>>>>>>>>>>>>> producing a record to Kafka, to only the ones
> that
> > >>>>>>>>>>>> are
> > >>>>>>>>>>>>>>> thrown
> > >>>>>>>>>>>>>>>>>>>> directly
> > >>>>>>>>>>>>>>>>>>>>>> from
> > >>>>>>>>>>>>>>>>>>>>>>> Producer::send;
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>   From the very beginning and even since
> KIP-1038,
> > >> the
> > >>>>>>>>>>>>> main
> > >>>>>>>>>>>>>>>>> purpose
> > >>>>>>>>>>>>>>>>>>> was
> > >>>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>> have "more flexibility," or, in other words,
> > "giving
> > >>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>> user
> > >>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>> authority" to handle some specific exceptions
> > thrown
> > >>>>>>>>>>>>> from
> > >>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>> `Producer`.
> > >>>>>>>>>>>>>>>>>>>>>>> Due to the specific cases we had in mind,
> KIP-1038
> > >>>>>>>>>>>> was
> > >>>>>>>>>>>>>>>>> discarded
> > >>>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>>>>> decided to not define a `CustomExceptionHandler`
> > for
> > >>>>>>>>>>>>>>>> `Producer`
> > >>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>> instead
> > >>>>>>>>>>>>>>>>>>>>>>> treat the `send` failures in a different way. The
> > >>>>>>>>>>>> main
> > >>>>>>>>>>>>>>> issue
> > >>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>>> `send`
> > >>>>>>>>>>>>>>>>>>>>>>> makes a transition to error state, which is
> > >>>>> undoable.
> > >>>>>>>>>>>>> In
> > >>>>>>>>>>>>>>>> fact,
> > >>>>>>>>>>>>>>>>>> one
> > >>>>>>>>>>>>>>>>>>>>> single
> > >>>>>>>>>>>>>>>>>>>>>>> poison pill record makes the whole batch fail.
> The
> > >>>>>>>>>>>>> former
> > >>>>>>>>>>>>>>>>>>> suggestions
> > >>>>>>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>>>> you agreed with have been all about un-doing this
> > >>>>>>>>>>>>>>> transition
> > >>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>> `flush`
> > >>>>>>>>>>>>>>>>>>>>>> or
> > >>>>>>>>>>>>>>>>>>>>>>> `commit`. The new suggestion is to un-do (or
> > better,
> > >>>>>>>>>>>>> NOT
> > >>>>>>>>>>>>>>> do)
> > >>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>> `send`
> > >>>>>>>>>>>>>>>>>>>>>> due
> > >>>>>>>>>>>>>>>>>>>>>>> to the reasons listed in the discussions above.
> > >>>>>>>>>>>>>>>>>>>>>>> Moreover, I would say that having such a large
> > scope
> > >>>>>>>>>>>> as
> > >>>>>>>>>>>>>> you
> > >>>>>>>>>>>>>>>>>>> mentioned
> > >>>>>>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>>>> impossible. In the best case, we may have control
> > >>>>>>>>>>>> over
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>> `Producer`.
> > >>>>>>>>>>>>>>>>>>>>>> What
> > >>>>>>>>>>>>>>>>>>>>>>> shall we do with the broker? The `any error that
> > >>>>>>>>>>>> might
> > >>>>>>>>>>>>>> take
> > >>>>>>>>>>>>>>>>> place
> > >>>>>>>>>>>>>>>>>>>> with
> > >>>>>>>>>>>>>>>>>>>>>>> producing a record to Kafka` is too much, I
> think.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> -------> is this all we want to handle, and will
> it
> > >>>>>>>>>>>>>> prevent
> > >>>>>>>>>>>>>>>> us
> > >>>>>>>>>>>>>>>>>> from
> > >>>>>>>>>>>>>>>>>>>>>>> handling more in the future in an intuitive way?
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> I think yes. This is all we want. Other sorts of
> > >>>>>>>>>>>> errors
> > >>>>>>>>>>>>>>> such
> > >>>>>>>>>>>>>>>> as
> > >>>>>>>>>>>>>>>>>>>> having
> > >>>>>>>>>>>>>>>>>>>>>>> problem with partition addition, producer fenced
> > >>>>>>>>>>>>>> exception,
> > >>>>>>>>>>>>>>>> etc
> > >>>>>>>>>>>>>>>>>>> seem
> > >>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>>>> more serious issues. The intention was to handle
> > >>>>>>>>>>>>> problems
> > >>>>>>>>>>>>>>>>> created
> > >>>>>>>>>>>>>>>>>>> by
> > >>>>>>>>>>>>>>>>>>>>>>> (maybe) a single poison pill record. BTW, I do
> not
> > >>>>>>>>>>>> see
> > >>>>>>>>>>>>>> any
> > >>>>>>>>>>>>>>>>>>> obstacles
> > >>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>> future changes.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> Bests,
> > >>>>>>>>>>>>>>>>>>>>>>> Alieh
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> On Sat, Jun 29, 2024 at 3:03 AM Chris Egerton
> > >>>>>>>>>>>>>>>>>>>> <chr...@aiven.io.invalid
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> Ah, sorry--spoke too soon. The PR doesn't show
> > that
> > >>>>>>>>>>>>>>> errors
> > >>>>>>>>>>>>>>>>>> thrown
> > >>>>>>>>>>>>>>>>>>>>> from
> > >>>>>>>>>>>>>>>>>>>>>>>> Producer::send are handled, but instead,
> > >>>>>>>>>>>> ApiException
> > >>>>>>>>>>>>>>>>> instances
> > >>>>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>>> are
> > >>>>>>>>>>>>>>>>>>>>>>>> caught inside KafkaProducer::doSend and are
> > handled
> > >>>>>>>>>>>>> by
> > >>>>>>>>>>>>>>>>>> returning
> > >>>>>>>>>>>>>>>>>>> an
> > >>>>>>>>>>>>>>>>>>>>>>>> already-failed future are. I think the same
> > >>>>>>>>>>>> question
> > >>>>>>>>>>>>>>> still
> > >>>>>>>>>>>>>>>>>>> applies
> > >>>>>>>>>>>>>>>>>>>>> (is
> > >>>>>>>>>>>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>>>>>>>>>> all we want to handle, and will it prevent us
> from
> > >>>>>>>>>>>>>>> handling
> > >>>>>>>>>>>>>>>>>> more
> > >>>>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>> future in an intuitive way), though.
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> On Fri, Jun 28, 2024 at 8:57 PM Chris Egerton <
> > >>>>>>>>>>>>>>>>> chr...@aiven.io
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> Hi Alieh,
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> This KIP has evolved a lot since I last looked
> at
> > >>>>>>>>>>>>> it,
> > >>>>>>>>>>>>>>> but
> > >>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>> changes
> > >>>>>>>>>>>>>>>>>>>>>>>> seem
> > >>>>>>>>>>>>>>>>>>>>>>>>> well thought-out both in semantics and API. One
> > >>>>>>>>>>>>>>>> clarifying
> > >>>>>>>>>>>>>>>>>>>>> question I
> > >>>>>>>>>>>>>>>>>>>>>>>> have
> > >>>>>>>>>>>>>>>>>>>>>>>>> is that it looks based on the draft PR that
> we've
> > >>>>>>>>>>>>>>>> narrowed
> > >>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>> scope
> > >>>>>>>>>>>>>>>>>>>>>>> from
> > >>>>>>>>>>>>>>>>>>>>>>>>> any error that might take place with producing
> a
> > >>>>>>>>>>>>>> record
> > >>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>> Kafka,
> > >>>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>> only
> > >>>>>>>>>>>>>>>>>>>>>>>>> the ones that are thrown directly from
> > >>>>>>>>>>>>>> Producer::send;
> > >>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>> intended
> > >>>>>>>>>>>>>>>>>>>>>>>>> behavior here? And if so, do you have thoughts
> on
> > >>>>>>>>>>>>> how
> > >>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>> might
> > >>>>>>>>>>>>>>>>>>>>>> design a
> > >>>>>>>>>>>>>>>>>>>>>>>>> follow-up KIP that would catch all errors
> > >>>>>>>>>>>>> (including
> > >>>>>>>>>>>>>>> ones
> > >>>>>>>>>>>>>>>>>>>> reported
> > >>>>>>>>>>>>>>>>>>>>>>>>> asynchronously instead of synchronously)? I'd
> > >>>>>>>>>>>> like
> > >>>>>>>>>>>>> it
> > >>>>>>>>>>>>>>> if
> > >>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>> could
> > >>>>>>>>>>>>>>>>>>>>>> leave
> > >>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>> door open for that without painting ourselves
> > >>>>>>>>>>>> into
> > >>>>>>>>>>>>>> too
> > >>>>>>>>>>>>>>>> much
> > >>>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>>>>> corner
> > >>>>>>>>>>>>>>>>>>>>>>>>> with the API design for this KIP.
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> Chris
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Jun 28, 2024 at 6:31 PM Matthias J.
> Sax <
> > >>>>>>>>>>>>>>>>>>>> mj...@apache.org>
> > >>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks Alieh,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> it seems this KIP can just pick between a
> couple
> > >>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>> tradeoffs.
> > >>>>>>>>>>>>>>>>>>>>>> Adding
> > >>>>>>>>>>>>>>>>>>>>>>> an
> > >>>>>>>>>>>>>>>>>>>>>>>>>> overloaded `send()` as the KIP propose makes
> > >>>>>>>>>>>> sense
> > >>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>> me
> > >>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>> seems
> > >>>>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>>>> provides the cleanest solution compare to
> there
> > >>>>>>>>>>>>>>> options
> > >>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>>>> discussed.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> Given the explicit name of the passed-in
> option
> > >>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>> highlights
> > >>>>>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>> option is for TX only make is pretty clear and
> > >>>>>>>>>>>>>> avoids
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>> issue
> > >>>>>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>>>>> `flush()` ambiguity.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> Nit: We should make clear on the KIP though,
> > >>>>>>>>>>>> that
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>> new
> > >>>>>>>>>>>>>>>>>>>> `send()`
> > >>>>>>>>>>>>>>>>>>>>>>>>>> overload would throw an
> `IllegalStateException`
> > >>>>>>>>>>>> if
> > >>>>>>>>>>>>>> TX
> > >>>>>>>>>>>>>>>> are
> > >>>>>>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>> used
> > >>>>>>>>>>>>>>>>>>>>>>>>>> (similar to other TX methods like initTx(),
> etc)
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> About the `Producer` interface, I am not sure
> > >>>>>>>>>>>> how
> > >>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>> was
> > >>>>>>>>>>>>>>>>>>> done
> > >>>>>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>> past (eg, KIP-266 added
> > >>>>>>>>>>>> `Consumer.poll(Duration)`
> > >>>>>>>>>>>>>> w/o
> > >>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>> default
> > >>>>>>>>>>>>>>>>>>>>>>>>>> implementation), if we need a default
> > >>>>>>>>>>>>> implementation
> > >>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>>> backward
> > >>>>>>>>>>>>>>>>>>>>>>>>>> compatibility or not? If we do want to add
> one,
> > >>>>>>>>>>>> I
> > >>>>>>>>>>>>>>> think
> > >>>>>>>>>>>>>>>> it
> > >>>>>>>>>>>>>>>>>>> would
> > >>>>>>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>>>>>>> appropriate to throw an
> > >>>>>>>>>>>>>>> `UnsupportedOperationException`
> > >>>>>>>>>>>>>>>> by
> > >>>>>>>>>>>>>>>>>>>>> default,
> > >>>>>>>>>>>>>>>>>>>>>>>>>> instead of just keeping the default impl
> empty?
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> My points are rather minor, and should not
> block
> > >>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>> KIP
> > >>>>>>>>>>>>>>>>>>>> though.
> > >>>>>>>>>>>>>>>>>>>>>>>>>> Overall LGTM.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> On 6/27/24 1:28 PM, Alieh Saeedi wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Justine,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the suggestion.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Making applications to validate every single
> > >>>>>>>>>>>>>> record
> > >>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>> best
> > >>>>>>>>>>>>>>>>>>>>>>>> way,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> from an efficiency point of view.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Moreover, between changing the behavior of
> the
> > >>>>>>>>>>>>>>>> Producer
> > >>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>> `send`
> > >>>>>>>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> `commitTnx`, the former seems more reasonable
> > >>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>> clean.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Bests,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Alieh
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Jun 27, 2024 at 8:14 PM Justine
> Olshan
> > >>>>>>>>>>>>>>>>>>>>>>>>>> <jols...@confluent.io.invalid>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Hey Alieh,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> I see there are two options now. So folks
> > >>>>>>>>>>>> will
> > >>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>> discussing
> > >>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>> approaches
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> and deciding the best way forward before we
> > >>>>>>>>>>>>> vote?
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> I do think there could be a problem with the
> > >>>>>>>>>>>>>>> approach
> > >>>>>>>>>>>>>>>>> on
> > >>>>>>>>>>>>>>>>>>>> commit
> > >>>>>>>>>>>>>>>>>>>>>> if
> > >>>>>>>>>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>>>>>>>> get
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> stuck on an earlier error and have more
> > >>>>>>>>>>>> records
> > >>>>>>>>>>>>>>>>>>> (potentially
> > >>>>>>>>>>>>>>>>>>>> on
> > >>>>>>>>>>>>>>>>>>>>>> new
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> partitions) to commit as the current PR is
> > >>>>>>>>>>>>>>>> implemented.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> I guess this takes us back to the question
> of
> > >>>>>>>>>>>>>>> whether
> > >>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>> error
> > >>>>>>>>>>>>>>>>>>>>>>>> should
> > >>>>>>>>>>>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> cleared on send.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> (And I guess at the back of my mind, I'm
> > >>>>>>>>>>>>>> wondering
> > >>>>>>>>>>>>>>> if
> > >>>>>>>>>>>>>>>>>> there
> > >>>>>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>>>>>> way
> > >>>>>>>>>>>>>>>>>>>>>>>>>> we can
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> validate the "posion pill" records
> > >>>>>>>>>>>> application
> > >>>>>>>>>>>>>> side
> > >>>>>>>>>>>>>>>>>> before
> > >>>>>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>>>> even
> > >>>>>>>>>>>>>>>>>>>>>>>> try
> > >>>>>>>>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> send them)
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Justine
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Jun 26, 2024 at 4:38 PM Alieh Saeedi
> > >>>>>>>>>>>>>>>>>>>>>>>>>> <asae...@confluent.io.invalid
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Justine,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I did not update the KIP with
> > >>>>>>>>>>>> `TxnSendOption`
> > >>>>>>>>>>>>>>> since
> > >>>>>>>>>>>>>>>> I
> > >>>>>>>>>>>>>>>>>>>> thought
> > >>>>>>>>>>>>>>>>>>>>>> it'd
> > >>>>>>>>>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> better discussed here beforehand.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> right now, there are 2 PRs:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> - the PR that implements the current
> version
> > >>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>> KIP:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/16332
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> - the POC PR that clarifies the
> > >>>>>>>>>>>>> `TxnSendOption`:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/16465
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Bests,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Alieh
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Jun 27, 2024 at 12:42 AM Justine
> > >>>>>>>>>>>>> Olshan
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> <jols...@confluent.io.invalid> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hey Alieh,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think I am a little confused. Are the 3
> > >>>>>>>>>>>>>> points
> > >>>>>>>>>>>>>>>>> above
> > >>>>>>>>>>>>>>>>>>>>>> addressed
> > >>>>>>>>>>>>>>>>>>>>>>> by
> > >>>>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> KIP
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> or did something change? The PR seems to
> > >>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>> include
> > >>>>>>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>>>>>>>> change
> > >>>>>>>>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> still
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> has the CommitOption as well.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Justine
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Jun 26, 2024 at 2:15 PM Alieh
> > >>>>>>>>>>>> Saeedi
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> <asae...@confluent.io.invalid
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi all,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Looking at the PR <
> > >>>>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/16332
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> corresponding to the KIP, there are some
> > >>>>>>>>>>>>>> points
> > >>>>>>>>>>>>>>>>> worthy
> > >>>>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>> mention:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) clearing the error ends up dirty/messy
> > >>>>>>>>>>>>> code
> > >>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> `TransactionManager`.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) By clearing the error, we are actually
> > >>>>>>>>>>>>>> doing
> > >>>>>>>>>>>>>>> an
> > >>>>>>>>>>>>>>>>>>> illegal
> > >>>>>>>>>>>>>>>>>>>>>>>>>> transition
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> from
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> `ABORTABLE_ERROR` to `IN_TRANSACTION`
> > >>>>>>>>>>>> which
> > >>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>> conceptually
> > >>>>>>>>>>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> acceptable.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> This can be the root cause of some
> issues,
> > >>>>>>>>>>>>>> with
> > >>>>>>>>>>>>>>>>>> perhaps
> > >>>>>>>>>>>>>>>>>>>>>> further
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> future
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> changes by others.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3) If the poison pill record `r1` causes
> a
> > >>>>>>>>>>>>>>>>> transition
> > >>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>> error
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> state
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and then the next record `r2` requires
> > >>>>>>>>>>>>> adding
> > >>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>> partition
> > >>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> transaction, the action fails due to
> being
> > >>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>> error
> > >>>>>>>>>>>>>>>>>>>>> state.
> > >>>>>>>>>>>>>>>>>>>>>>> In
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> case, clearing errors during
> > >>>>>>>>>>>>>>>>>>> `commitTnx(CLEAR_SEND_ERROR)`
> > >>>>>>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>>>> too
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> late.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> However, this case can NOT be the main
> > >>>>>>>>>>>>> concern
> > >>>>>>>>>>>>>>> as
> > >>>>>>>>>>>>>>>>> soon
> > >>>>>>>>>>>>>>>>>>> as
> > >>>>>>>>>>>>>>>>>>>>>>> KIP-890
> > >>>>>>>>>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> fully
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> implemented.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> My suggestion is to solve the problem
> > >>>>>>>>>>>> where
> > >>>>>>>>>>>>> it
> > >>>>>>>>>>>>>>>>> arises.
> > >>>>>>>>>>>>>>>>>>> If
> > >>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> transition
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the error state does not happen during
> > >>>>>>>>>>>>>> `send()`,
> > >>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>> do
> > >>>>>>>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>>>> need
> > >>>>>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> clear
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the error later. Therefore, instead of
> > >>>>>>>>>>>>>>>>> `CommitOption`,
> > >>>>>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>>> can
> > >>>>>>>>>>>>>>>>>>>>>>>> define
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> `TxnSendOption` and prevent the `send()`
> > >>>>>>>>>>>>>> method
> > >>>>>>>>>>>>>>>> from
> > >>>>>>>>>>>>>>>>>>> going
> > >>>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> error
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> state in case 1) we're in a transaction
> > >>>>>>>>>>>> and
> > >>>>>>>>>>>>> 2)
> > >>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>> user
> > >>>>>>>>>>>>>>>>>>>>> asked
> > >>>>>>>>>>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> IGONRE_SEND_ERRORS. For more clarity, you
> > >>>>>>>>>>>>> can
> > >>>>>>>>>>>>>>>> take a
> > >>>>>>>>>>>>>>>>>>> look
> > >>>>>>>>>>>>>>>>>>>> at
> > >>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>> POC
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> PR
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <
> > >>>>>>>>>>>> https://github.com/apache/kafka/pull/16465
> > >>>>>>>>>>>>>> .
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Alieh
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> > >
> >

Reply via email to