Hi Alieh,
Just one final comment.

[AJS5] Existing classes use Retriable, not Retryable. For example:
https://kafka.apache.org/21/javadoc/org/apache/kafka/common/errors/RetriableException.html

I suggest RetriableResponse and NonRetriableResponse.

Thanks,
Andrew

> On 13 May 2024, at 23:17, Alieh Saeedi <asae...@confluent.io.INVALID> wrote:
>
> Hi all,
>
>
> Thanks for all the valid points you listed.
>
>
> KIP updates and addressing concerns:
>
>
> 1) The KIP now suggests two Response types: `RetryableResponse` and
> `NonRetryableResponse`
>
>
> 2) `custom.exception.handler` is changed to `custom.exception.handler.class`
>
>
> 3) The KIP clarifies that `In the case of an implemented handler for the
> specified exception, the handler takes precedence.`
>
>
> 4)  There is now a `default` implementation for both handle() methods.
>
>
> 5)  @Chris: for `UnknownTopicOrPartition`, the default is already retrying
> for 60s. (In fact, the default value of `max.block.ms`). If the handler
> instructs to FAIL or SWALLOW, there will be no retry, and if the handler
> instructs to RETRY, that will be the default behavior, which follows the
> values in already existing config parameters such as `max.block.ms`. Does
> that make sense?
>
>
> Hope the changes and explanations are convincing :)
>
>
> Cheers,
>
> Alieh
>
> On Mon, May 13, 2024 at 6:40 PM Justine Olshan <jols...@confluent.io.invalid>
> wrote:
>
>> Oh I see. The type isn't the error type but a newly defined type for the
>> response. Makes sense and works for me.
>>
>> Justine
>>
>> On Mon, May 13, 2024 at 9:13 AM Chris Egerton <fearthecel...@gmail.com>
>> wrote:
>>
>>> If we have dedicated methods for each kind of exception
>>> (handleRecordTooLarge, handleUnknownTopicOrPartition, etc.), doesn't that
>>> provide sufficient constraint? I'm not suggesting we eliminate these
>>> methods, just that we change their return types to something more
>> flexible.
>>>
>>> On Mon, May 13, 2024, 12:07 Justine Olshan <jols...@confluent.io.invalid
>>>
>>> wrote:
>>>
>>>> I'm not sure I agree with the Retriable and NonRetriableResponse
>> comment.
>>>> This doesn't limit the blast radius or enforce certain errors are used.
>>>> I think we might disagree on how controlled these interfaces can be...
>>>>
>>>> Justine
>>>>
>>>> On Mon, May 13, 2024 at 8:40 AM Chris Egerton <chr...@aiven.io.invalid
>>>
>>>> wrote:
>>>>
>>>>> Hi Alieh,
>>>>>
>>>>> Thanks for the updates! I just have a few more thoughts:
>>>>>
>>>>> - I don't think a boolean property is sufficient to dictate retries
>> for
>>>>> unknown topic partitions, though. These errors can occur if a topic
>> has
>>>>> just been created, which can occur if, for example, automatic topic
>>>>> creation is enabled for a multi-task connector. This is why I
>> proposed
>>> a
>>>>> timeout instead of a boolean (and see my previous email for why
>>> reducing
>>>>> max.block.ms for a producer is not a viable alternative). If it
>> helps,
>>>> one
>>>>> way to reproduce this yourself is to add the line
>>>>> `fooProps.put(TASKS_MAX_CONFIG, "10");` to the integration test here:
>>>>>
>>>>>
>>>>
>>>
>> https://github.com/apache/kafka/blob/5439914c32fa00d634efa7219699f1bc21add839/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java#L134
>>>>> and then check the logs afterward for messages like "Error while
>>> fetching
>>>>> metadata with correlation id <n> :
>>>> {foo-topic=UNKNOWN_TOPIC_OR_PARTITION}".
>>>>>
>>>>> - I also don't think we need custom XxxResponse enums for every
>>> possible
>>>>> method; it seems like this will lead to a lot of duplication and
>>>> cognitive
>>>>> overhead if we want to expand the error handler in the future.
>>> Something
>>>>> more flexible like RetriableResponse and NonRetriableResponse could
>>>>> suffice.
>>>>>
>>>>> - Finally, the KIP still doesn't state how the handler will or won't
>>> take
>>>>> precedence over existing retry properties. If I set `retries` or `
>>>>> delivery.timeout.ms` or `max.block.ms` to low values, will that
>> cause
>>>>> retries to cease even if my custom handler would otherwise keep
>>> returning
>>>>> RETRY for an error?
>>>>>
>>>>> Cheers,
>>>>>
>>>>> Chris
>>>>>
>>>>> On Mon, May 13, 2024 at 11:02 AM Andrew Schofield <
>>>>> andrew_schofi...@live.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Alieh,
>>>>>> Just a few more comments on the KIP. It is looking much less risky
>>> now
>>>>> the
>>>>>> scope
>>>>>> is tighter.
>>>>>>
>>>>>> [AJS1] It would be nice to have default implementations of the
>> handle
>>>>>> methods
>>>>>> so an implementor would not need to implement both themselves.
>>>>>>
>>>>>> [AJS2] Producer configurations which are class names usually end in
>>>>>> “.class”.
>>>>>> I suggest “custom.exception.handler.class”.
>>>>>>
>>>>>> [AJS3] If I implemented a handler, and I set a non-default value
>> for
>>>> one
>>>>>> of the
>>>>>> new configuations, what happens? I would expect that the handler
>>> takes
>>>>>> precedence. I wasn’t quite clear what “the control will follow the
>>>>> handler
>>>>>> instructions” meant.
>>>>>>
>>>>>> [AJS4] Because you now have an enum for the
>>>>>> RecordTooLargeExceptionResponse,
>>>>>> I don’t think you need to state in the comment for
>>>>>> ProducerExceptionHandler that
>>>>>> RETRY will be interpreted as FAIL.
>>>>>>
>>>>>> Thanks,
>>>>>> Andrew
>>>>>>
>>>>>>> On 13 May 2024, at 14:53, Alieh Saeedi
>>> <asae...@confluent.io.INVALID
>>>>>
>>>>>> wrote:
>>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>>
>>>>>>> Thanks for the very interesting discussion during my PTO.
>>>>>>>
>>>>>>>
>>>>>>> KIP updates and addressing concerns:
>>>>>>>
>>>>>>>
>>>>>>> 1) Two handle() methods are defined in ProducerExceptionHandler
>> for
>>>> the
>>>>>> two
>>>>>>> exceptions with different input parameters so that we have
>>>>>>> handle(RecordTooLargeException e, ProducerRecord record) and
>>>>>>> handle(UnknownTopicOrPartitionException e, ProducerRecord record)
>>>>>>>
>>>>>>>
>>>>>>> 2) The ProducerExceptionHandler extends `Closable` as well.
>>>>>>>
>>>>>>>
>>>>>>> 3) The KIP suggests having two more configuration parameters with
>>>>> boolean
>>>>>>> values:
>>>>>>>
>>>>>>> - `drop.invalid.large.records` with a default value of `false`
>> for
>>>>>>> swallowing too large records.
>>>>>>>
>>>>>>> - `retry.unknown.topic.partition` with a default value of `true`
>>> that
>>>>>>> performs RETRY for `max.block.ms` ms, encountering the
>>>>>>> UnknownTopicOrPartitionException.
>>>>>>>
>>>>>>>
>>>>>>> Hope the main concerns are addressed so that we can go forward
>> with
>>>>>> voting.
>>>>>>>
>>>>>>>
>>>>>>> Cheers,
>>>>>>>
>>>>>>> Alieh
>>>>>>>
>>>>>>> On Thu, May 9, 2024 at 11:25 PM Artem Livshits
>>>>>>> <alivsh...@confluent.io.invalid> wrote:
>>>>>>>
>>>>>>>> Hi Mathias,
>>>>>>>>
>>>>>>>>> [AL1] While I see the point, I would think having a different
>>>>> callback
>>>>>>>> for every exception might not really be elegant?
>>>>>>>>
>>>>>>>> I'm not sure how to assess the level of elegance of the
>> proposal,
>>>> but
>>>>> I
>>>>>> can
>>>>>>>> comment on the technical characteristics:
>>>>>>>>
>>>>>>>> 1. Having specific interfaces that codify the logic that is
>>>> currently
>>>>>>>> prescribed in the comments reduce the chance of making a
>> mistake.
>>>>>>>> Commments may get ignored, misuderstood or etc. but if the
>>> contract
>>>> is
>>>>>>>> codified, the compilier will help to enforce the contract.
>>>>>>>> 2. Given that the logic is trickier than it seems (the
>>>>> record-too-large
>>>>>> is
>>>>>>>> an example that can easily confuse someone who's not intimately
>>>>> familiar
>>>>>>>> with the nuances of the batching logic), having a little more
>>> hoops
>>>> to
>>>>>> jump
>>>>>>>> would give a greater chance that whoever tries to add a new
>> cases
>>>>> pauses
>>>>>>>> and thinks a bit more.
>>>>>>>> 3. As Justine pointed out, having different method will be a
>>> forcing
>>>>>>>> function to go through a KIP rather than smuggle new cases
>> through
>>>>>>>> implementation.
>>>>>>>> 4. Sort of a consequence of the previous 3 -- all those things
>>>> reduce
>>>>>> the
>>>>>>>> chance of someone writing the code that works with 2 errors and
>>> then
>>>>>> when
>>>>>>>> more errors are added in the future will suddenly incorrectly
>>> ignore
>>>>> new
>>>>>>>> errors (the example I gave in the previous email).
>>>>>>>>
>>>>>>>>> [AL2 cont.] Similar to AL1, I see such a handler to some extend
>>> as
>>>>>>>> business logic. If a user puts a bad filter condition in their
>> KS
>>>> app,
>>>>>> and
>>>>>>>> drops messages
>>>>>>>>
>>>>>>>> I agree that there is always a chance to get a bug and lose
>>>> messages,
>>>>>> but
>>>>>>>> there are generally separation of concerns that has different
>> risk
>>>>>> profile:
>>>>>>>> the filtering logic may be more rigorously tested and rarely
>>> changed
>>>>>> (say
>>>>>>>> an application developer does it), but setting the topics to
>>> produce
>>>>>> may be
>>>>>>>> done via configuration (e.g. a user of the application does it)
>>> and
>>>>> it's
>>>>>>>> generally an expectation that users would get an error when
>>>>>> configuration
>>>>>>>> is incorrect.
>>>>>>>>
>>>>>>>> What could be worse is that UnknownTopicOrPartitionException can
>>> be
>>>> an
>>>>>>>> intermittent error, i.e. with a generally correct configuration,
>>>> there
>>>>>>>> could be metadata propagation problem on the cluster and then a
>>>> random
>>>>>> set
>>>>>>>> of records could get lost.
>>>>>>>>
>>>>>>>>> [AL3] Maybe I misunderstand what you are saying, but to me,
>>>> checking
>>>>>> the
>>>>>>>> size of the record upfront is exactly what the KIP proposes? No?
>>>>>>>>
>>>>>>>> It achieves the same result but solves it differently, my
>>> proposal:
>>>>>>>>
>>>>>>>> 1. Application checks the validity of a record (maybe via a new
>>>>>>>> validateRecord method) before producing it, and can just exclude
>>> it
>>>> or
>>>>>>>> return an error to the user.
>>>>>>>> 2. Application produces the record -- at this point there are no
>>>>> records
>>>>>>>> that could return record too large, they were either skipped at
>>>> step 1
>>>>>> or
>>>>>>>> we didn't get here because step 1 failed.
>>>>>>>>
>>>>>>>> Vs. KIP's proposal
>>>>>>>>
>>>>>>>> 1. Application produces the record.
>>>>>>>> 2. Application gets a callback.
>>>>>>>> 3. Application returns the action on how to proceed.
>>>>>>>>
>>>>>>>> The advantage of the former is the clarity of semantics -- the
>>>> record
>>>>> is
>>>>>>>> invalid (property of the record, not a function of server state
>> or
>>>>>> server
>>>>>>>> configuration) and we can clearly know that it is the record
>> that
>>> is
>>>>> bad
>>>>>>>> and can never succeed.
>>>>>>>>
>>>>>>>> The KIP-proposed way actually has a very tricky point: it
>> actually
>>>>>> handles
>>>>>>>> a subset of record-too-large exceptions.  The broker can return
>>>>>>>> record-too-large and reject the whole batch (but we don't want
>> to
>>>>> ignore
>>>>>>>> those because then we can skip random records that just happened
>>> to
>>>> be
>>>>>> in
>>>>>>>> the same batch), in some sense we use the same error for 2
>>> different
>>>>>>>> conditions and understanding that requires pretty deep
>>> understanding
>>>>> of
>>>>>>>> Kafka internals.
>>>>>>>>
>>>>>>>> -Artem
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, May 8, 2024 at 9:47 AM Justine Olshan
>>>>>> <jols...@confluent.io.invalid
>>>>>>>>>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> My concern with respect to it being fragile: the code that
>>> ensures
>>>>> the
>>>>>>>>> error type is internal to the producer. Someone may see it and
>>>> say, I
>>>>>>>> want
>>>>>>>>> to add such and such error. This looks like internal code, so I
>>>> don't
>>>>>>>> need
>>>>>>>>> a KIP, and then they can change it to whatever they want
>> thinking
>>>> it
>>>>> is
>>>>>>>>> within the typical kafka improvement protocol.
>>>>>>>>>
>>>>>>>>> Relying on an internal change to enforce an external API is
>>> fragile
>>>>> in
>>>>>> my
>>>>>>>>> opinion. That's why I sort of agreed with Artem with enforcing
>>> the
>>>>>> error
>>>>>>>> in
>>>>>>>>> the method signature -- part of the public API.
>>>>>>>>>
>>>>>>>>> Chris's comments on requiring more information to handler again
>>>> makes
>>>>>> me
>>>>>>>>> wonder if we are solving a problem of lack of information at
>> the
>>>>>>>>> application level with a more powerful solution than we need.
>>> (Ie,
>>>> if
>>>>>> we
>>>>>>>>> had more information, could the application close and restart
>> the
>>>>>>>>> transaction rather than having to drop records) But I am happy
>> to
>>>>>>>>> compromise with a handler that we can agree is sufficiently
>>>>> controlled
>>>>>>>> and
>>>>>>>>> documented.
>>>>>>>>>
>>>>>>>>> Justine
>>>>>>>>>
>>>>>>>>> On Wed, May 8, 2024 at 7:20 AM Chris Egerton
>>>> <chr...@aiven.io.invalid
>>>>>>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Alieh,
>>>>>>>>>>
>>>>>>>>>> Continuing prior discussions:
>>>>>>>>>>
>>>>>>>>>> 1) Regarding the "flexibility" discussion, my overarching
>> point
>>> is
>>>>>>>> that I
>>>>>>>>>> don't see the point in allowing for this kind of pluggable
>> logic
>>>>>>>> without
>>>>>>>>>> also covering more scenarios. Take example 2 in the KIP: if
>>> we're
>>>>>> going
>>>>>>>>> to
>>>>>>>>>> implement retries only on "important" topics when a topic
>>>> partition
>>>>>>>> isn't
>>>>>>>>>> found, why wouldn't we also want to be able to do this for
>> other
>>>>>>>> errors?
>>>>>>>>>> Again, taking authorization errors as an example, why wouldn't
>>> we
>>>>> want
>>>>>>>> to
>>>>>>>>>> be able to fail when we can't write to "important" topics
>>> because
>>>>> the
>>>>>>>>>> producer principal lacks sufficient ACLs, and drop the record
>> if
>>>> the
>>>>>>>>> topic
>>>>>>>>>> isn't "important"? In a security-conscious environment with
>>>>>>>>>> runtime-dependent topic routing (which is a common feature of
>>> many
>>>>>>>> source
>>>>>>>>>> connectors, such as the Debezium connectors), this seems
>> fairly
>>>>>> likely.
>>>>>>>>>>
>>>>>>>>>> 2) As far as changing the shape of the API goes, I like
>> Artem's
>>>> idea
>>>>>> of
>>>>>>>>>> splitting out the interface based on specific exceptions. This
>>> may
>>>>> be
>>>>>> a
>>>>>>>>>> little laborious to expand in the future, but if we really
>> want
>>> to
>>>>>>>>>> limit the exceptions that we cover with the handler and move
>>>> slowly
>>>>>> and
>>>>>>>>>> cautiously, then IMO it'd be reasonable to reflect that in the
>>>>>>>>> interface. I
>>>>>>>>>> also acknowledge that there's no way to completely prevent
>>> people
>>>>> from
>>>>>>>>>> shooting themselves in the foot by implementing the API
>>>> incorrectly,
>>>>>>>> but
>>>>>>>>> I
>>>>>>>>>> think it's worth it to do what we can--including leveraging
>> the
>>>> Java
>>>>>>>>>> language's type system--to help them, so IMO there's value to
>>>>>>>> eliminating
>>>>>>>>>> the implicit behavior of failing when a policy returns RETRY
>>> for a
>>>>>>>>>> non-retriable error. This can take a variety of shapes and I'm
>>> not
>>>>>>>> going
>>>>>>>>> to
>>>>>>>>>> insist on anything specific, but I do want to again raise my
>>>>> concerns
>>>>>>>>> with
>>>>>>>>>> the current proposal and request that we find something a
>> little
>>>>>>>> better.
>>>>>>>>>>
>>>>>>>>>> 3) Concerning the default implementation--actually, I meant
>>> what I
>>>>>>>> wrote
>>>>>>>>> :)
>>>>>>>>>> I don't want a "second" default, I want an implementation of
>>> this
>>>>>>>>> interface
>>>>>>>>>> to be used as the default if no others are specified. The
>>> behavior
>>>>> of
>>>>>>>>> this
>>>>>>>>>> default implementation would be identical to existing behavior
>>> (so
>>>>>>>> there
>>>>>>>>>> would be no backwards compatibility concerns like the ones
>>> raised
>>>> by
>>>>>>>>>> Matthias), but it would be possible to configure this default
>>>>> handler
>>>>>>>>> class
>>>>>>>>>> to behave differently for a basic set of scenarios. This would
>>>>> mirror
>>>>>>>>> (pun
>>>>>>>>>> intended) the approach we've taken with Mirror Maker 2 and its
>>>>>>>>>> ReplicationPolicy interface [1]. There is a default
>>> implementation
>>>>>>>>>> available [2] that recognizes a handful of basic configuration
>>>>>>>> properties
>>>>>>>>>> [3] for simple tweaks, but if users want, they can also
>>> implement
>>>>>> their
>>>>>>>>> own
>>>>>>>>>> replication policy for more fine-grained logic if those
>>> properties
>>>>>>>> aren't
>>>>>>>>>> flexible enough.
>>>>>>>>>>
>>>>>>>>>> More concretely, I'm imagining something like this for the
>>>> producer
>>>>>>>>>> exception handler:
>>>>>>>>>>
>>>>>>>>>> - Default implementation class
>>>>>>>>>> of
>>>> org.apache.kafka.clients.producer.DefaultProducerExceptionHandler
>>>>>>>>>> - This class would recognize two properties:
>>>>>>>>>> - drop.invalid.large.records: Boolean property, defaults to
>>>> false.
>>>>> If
>>>>>>>>>> "false", then causes the handler to return FAIL whenever
>>>>>>>>>> a RecordTooLargeException is encountered; if "true", then
>> causes
>>>>>>>>>> SWALLOW/SKIP/DROP to be returned instead
>>>>>>>>>> - unknown.topic.partition.retry.timeout.ms: Integer
>> property,
>>>>>>>> defaults
>>>>>>>>>> to
>>>>>>>>>> INT_MAX. Whenever an UnknownTopicOrPartitionException is
>>>>> encountered,
>>>>>>>>>> causes the handler to return FAIL if that record has been
>>> pending
>>>>> for
>>>>>>>>> more
>>>>>>>>>> than the retry timeout; otherwise, causes RETRY to be returned
>>>>>>>>>>
>>>>>>>>>> I think this is worth addressing now instead of later because
>> it
>>>>>> forces
>>>>>>>>> us
>>>>>>>>>> to evaluate the usefulness of this interface and it addresses
>> a
>>>>>>>>>> long-standing issue not just with Kafka Connect, but with the
>>> Java
>>>>>>>>> producer
>>>>>>>>>> in general. For reference, here are a few tickets I collected
>>>> after
>>>>>>>>> briefly
>>>>>>>>>> skimming our Jira showing that this is a real pain point for
>>>> users:
>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-10340,
>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-12990,
>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-13634. Although
>>> this
>>>> is
>>>>>>>>>> frequently reported with Kafka Connect, it applies to anyone
>> who
>>>>>>>>> configures
>>>>>>>>>> a producer to use a high retry timeout. I am aware of the
>>>>>> max.block.ms
>>>>>>>>>> property, but it's painful and IMO poor behavior to require
>>> users
>>>> to
>>>>>>>>> reduce
>>>>>>>>>> the value of this property just to handle the single scenario
>>> when
>>>>>>>> trying
>>>>>>>>>> to write to topics that don't exist, since it would also limit
>>> the
>>>>>>>> retry
>>>>>>>>>> timeout for other operations that are legitimately retriable.
>>>>>>>>>>
>>>>>>>>>> Raising new points:
>>>>>>>>>>
>>>>>>>>>> 5) I don't see the interplay between this handler and existing
>>>>>>>>>> retry-related properties mentioned anywhere in the KIP. I'm
>>>> assuming
>>>>>>>> that
>>>>>>>>>> properties like "retries", "max.block.ms", and "
>>>> delivery.timeout.ms
>>>>> "
>>>>>>>>> would
>>>>>>>>>> take precedence over the handler and once they are exhausted,
>>> the
>>>>>>>>>> record/batch will fail no matter what? If so, it's probably
>>> worth
>>>>>>>> briefly
>>>>>>>>>> mentioning this (no more than a sentence or two) in the KIP,
>> and
>>>> if
>>>>>>>> not,
>>>>>>>>>> I'm curious what you have in mind.
>>>>>>>>>>
>>>>>>>>>> 6) I also wonder if the API provides enough information in its
>>>>> current
>>>>>>>>>> form. Would it be possible to provide handlers with some way
>> of
>>>>>>>> tracking
>>>>>>>>>> how long a record has been pending for (i.e., how long it's
>> been
>>>>> since
>>>>>>>>> the
>>>>>>>>>> record was provided as an argument to Producer::send)? One way
>>> to
>>>> do
>>>>>>>> this
>>>>>>>>>> could be to add a method like `onNewRecord(ProducerRecord)`
>> and
>>>>>>>>>> allow/require the handler to do its own bookkeeping, probably
>>>> with a
>>>>>>>>>> matching `onRecordSuccess(ProducerRecord)` method so that the
>>>>> handler
>>>>>>>>>> doesn't eat up an ever-increasing amount of memory trying to
>>> track
>>>>>>>>> records.
>>>>>>>>>> An alternative could be to include information about the
>> initial
>>>>> time
>>>>>>>> the
>>>>>>>>>> record was received by the producer and the number of retries
>>> that
>>>>>> have
>>>>>>>>>> been performed for it as parameters in the handle method(s),
>> but
>>>> I'm
>>>>>>>> not
>>>>>>>>>> sure how easy this would be to implement and it might clutter
>>>> things
>>>>>>>> up a
>>>>>>>>>> bit too much.
>>>>>>>>>>
>>>>>>>>>> 7) A small request--can we add Closeable (or, if you prefer,
>>>>>>>>> AutoCloseable)
>>>>>>>>>> as a superinterface for the handler interface?
>>>>>>>>>>
>>>>>>>>>> [1] -
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/ReplicationPolicy.html
>>>>>>>>>> [2] -
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html
>>>>>>>>>> [3] -
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html#SEPARATOR_CONFIG
>>>>>>>>>> ,
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html#INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>>
>>>>>>>>>> Chris
>>>>>>>>>>
>>>>>>>>>> On Wed, May 8, 2024 at 12:37 AM Matthias J. Sax <
>>> mj...@apache.org
>>>>>
>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Very interesting discussion.
>>>>>>>>>>>
>>>>>>>>>>> Seems a central point is the question about "how generic" we
>>>>> approach
>>>>>>>>>>> this, and some people think we need to be conservative and
>>> others
>>>>>>>> think
>>>>>>>>>>> we should try to be as generic as possible.
>>>>>>>>>>>
>>>>>>>>>>> Personally, I think following a limited scope for this KIP
>> (by
>>>>>>>>>>> explicitly saying we only cover RecordTooLarge and
>>>>>>>>>>> UnknownTopicOrPartition) might be better. We have concrete
>>>> tickets
>>>>>>>> that
>>>>>>>>>>> we address, while for other exception (like authorization) we
>>>> don't
>>>>>>>>> know
>>>>>>>>>>> if people want to handle it to begin with. Boiling the ocean
>>>> might
>>>>>>>> not
>>>>>>>>>>> get us too far, and being somewhat pragmatic might help to
>> move
>>>>> this
>>>>>>>>> KIP
>>>>>>>>>>> forward. -- I also agree with Justin and Artem, that we want
>> to
>>>> be
>>>>>>>>>>> careful anyway to not allow users to break stuff too easily.
>>>>>>>>>>>
>>>>>>>>>>> As the same time, I agree that we should setup this change
>> in a
>>>>>>>> forward
>>>>>>>>>>> looking way, and thus having a single generic handler allows
>> us
>>>> to
>>>>>>>>> later
>>>>>>>>>>> extend the handler more easily. This should also simplify
>>> follow
>>>> up
>>>>>>>> KIP
>>>>>>>>>>> that might add new error cases (I actually mentioned one more
>>> to
>>>>>>>> Alieh
>>>>>>>>>>> already, but we both agreed that it might be best to exclude
>> it
>>>>> from
>>>>>>>>> the
>>>>>>>>>>> KIP right now, to make the 3.8 deadline. Doing a follow up
>> KIP
>>> is
>>>>> not
>>>>>>>>>>> the end of the world.)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> @Chris:
>>>>>>>>>>>
>>>>>>>>>>> (2) This sounds fair to me, but not sure how "bad" it
>> actually
>>>>> would
>>>>>>>>> be?
>>>>>>>>>>> If the contract is clearly defined, it seems to be fine what
>>> the
>>>>> KIP
>>>>>>>>>>> proposes, and given that such a handler is an expert API, and
>>> we
>>>>> can
>>>>>>>>>>> provide "best practices" (cf my other comment below in
>> [AL1]),
>>>>> being
>>>>>>>> a
>>>>>>>>>>> little bit pragmatic sound fine to me.
>>>>>>>>>>>
>>>>>>>>>>> Not sure if I understand Justin's argument on this question?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> (3) About having a default impl or not. I am fine with adding
>>>> one,
>>>>>>>> even
>>>>>>>>>>> if I am not sure why Connect could not just add its own one
>> and
>>>>> plug
>>>>>>>> it
>>>>>>>>>>> in (and we would add corresponding configs for Connect, but
>> not
>>>> for
>>>>>>>> the
>>>>>>>>>>> producer itself)? For this case, we could also do this as a
>>>> follow
>>>>> up
>>>>>>>>>>> KIP, but happy to include it in this KIP to provide value to
>>>>> Connect
>>>>>>>>>>> right away (even if the value might not come right away if we
>>>> miss
>>>>>>>> the
>>>>>>>>>>> 3.8 deadline due to expanded KIP scope...) --  For KS, we
>> would
>>>> for
>>>>>>>>> sure
>>>>>>>>>>> plugin our own impl, and lock down the config such that users
>>>>> cannot
>>>>>>>>> set
>>>>>>>>>>> their own handler on the internal producer to begin with.
>> Might
>>>> be
>>>>>>>> good
>>>>>>>>>>> to elaborate why the producer should have a default? We might
>>>>>>>> actually
>>>>>>>>>>> want to add this to the KIP right away?
>>>>>>>>>>>
>>>>>>>>>>> The key for a default impl would be, to not change the
>> current
>>>>>>>>> behavior,
>>>>>>>>>>> and having no default seems to achieve this. For the two
>> cases
>>>> you
>>>>>>>>>>> mentioned, it's unclear to me what default value on "upper
>>> bound
>>>> on
>>>>>>>>>>> retires" for UnkownTopicOrPartitionException we should set?
>>> Seems
>>>>> it
>>>>>>>>>>> would need to be the same as `delivery.timeout.ms`? However,
>>> if
>>>>>>>> users
>>>>>>>>>>> have `delivery.timeout.ms` actually overwritten we would
>> need
>>> to
>>>>> set
>>>>>>>>>>> this config somewhat "dynamic"? Is this feasible? If we
>>>> hard-code 2
>>>>>>>>>>> minutes, it might not be backward compatible. I have the
>>>> impression
>>>>>>>> we
>>>>>>>>>>> might introduce some undesired coupling? -- For the "record
>> too
>>>>>>>> large"
>>>>>>>>>>> case, the config seems to be boolean and setting it to
>> `false`
>>> by
>>>>>>>>>>> default seems to provide backward compatibility.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> @Artem:
>>>>>>>>>>>
>>>>>>>>>>> [AL1] While I see the point, I would think having a different
>>>>>>>> callback
>>>>>>>>>>> for every exception might not really be elegant? In the end,
>>> the
>>>>>>>>> handler
>>>>>>>>>>> is an very advanced feature anyway, and if it's implemented
>> in
>>> a
>>>>> bad
>>>>>>>>>>> way, well, it's a user error -- we cannot protect users from
>>>>>>>>> everything.
>>>>>>>>>>> To me, a handler like this, is to some extend "business
>> logic"
>>>> and
>>>>>>>> if a
>>>>>>>>>>> user gets business logic wrong, it's hard to protect them. --
>>> We
>>>>>>>> would
>>>>>>>>>>> of course provide best practice guidance in the JaveDocs, and
>>>>> explain
>>>>>>>>>>> that a handler should have explicit `if` statements for stuff
>>> it
>>>>> want
>>>>>>>>> to
>>>>>>>>>>> handle, and only a single default which return FAIL.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> [AL2] Yes, but for KS we would retry at the application
>> layer.
>>>> Ie,
>>>>>>>> the
>>>>>>>>>>> TX is not completed, we clean up and setup out task from
>>> scratch,
>>>>> to
>>>>>>>>>>> ensure the pending transaction is completed before we resume.
>>> If
>>>>> the
>>>>>>>> TX
>>>>>>>>>>> was indeed aborted, we would retry from older offset and thus
>>>> just
>>>>>>>> hit
>>>>>>>>>>> the same error again and the loop begins again.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> [AL2 cont.] Similar to AL1, I see such a handler to some
>> extend
>>>> as
>>>>>>>>>>> business logic. If a user puts a bad filter condition in
>> their
>>> KS
>>>>>>>> app,
>>>>>>>>>>> and drops messages, it nothing we can do about it, and this
>>>> handler
>>>>>>>>>>> IMHO, has a similar purpose. This is also the line of
>> thinking
>>> I
>>>>>>>> apply
>>>>>>>>>>> to EOS, to address Justin's concern about "should we allow to
>>>> drop
>>>>>>>> for
>>>>>>>>>>> EOS", and my answer is "yes", because it's more business
>> logic
>>>> than
>>>>>>>>>>> actual error handling IMHO. And by default, we fail... So
>> users
>>>>>>>> opt-in
>>>>>>>>>>> to add business logic to drop records. It's an application
>>> level
>>>>>>>>>>> decision how to write the code.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> [AL3] Maybe I misunderstand what you are saying, but to me,
>>>>> checking
>>>>>>>>> the
>>>>>>>>>>> size of the record upfront is exactly what the KIP proposes?
>>> No?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> @Justin:
>>>>>>>>>>>
>>>>>>>>>>>> I saw the sample
>>>>>>>>>>>> code -- is it just an if statement checking for the error
>>> before
>>>>>>>> the
>>>>>>>>>>>> handler is invoked? That seems a bit fragile.
>>>>>>>>>>>
>>>>>>>>>>> What do you mean by fragile? Not sure if I see your point.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> -Matthias
>>>>>>>>>>>
>>>>>>>>>>> On 5/7/24 5:33 PM, Artem Livshits wrote:
>>>>>>>>>>>> Hi Alieh,
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for the KIP.  The motivation talks about very
>> specific
>>>>>>>> cases,
>>>>>>>>>> but
>>>>>>>>>>>> the interface is generic.
>>>>>>>>>>>>
>>>>>>>>>>>> [AL1]
>>>>>>>>>>>> If the interface evolves in the future I think we could have
>>> the
>>>>>>>>>>> following
>>>>>>>>>>>> confusion:
>>>>>>>>>>>>
>>>>>>>>>>>> 1. A user implemented SWALLOW action for both
>>>>>>>> RecordTooLargeException
>>>>>>>>>> and
>>>>>>>>>>>> UnknownTopicOrPartitionException.  For simpicity they just
>>>> return
>>>>>>>>>> SWALLOW
>>>>>>>>>>>> from the function, because it elegantly handles all known
>>> cases.
>>>>>>>>>>>> 2. The interface has evolved to support a new exception.
>>>>>>>>>>>> 3. The user has upgraded their Kafka client.
>>>>>>>>>>>>
>>>>>>>>>>>> Now a new kind of error gets dropped on the floor without
>>> user's
>>>>>>>>>>> intention
>>>>>>>>>>>> and it would be super hard to detect and debug.
>>>>>>>>>>>>
>>>>>>>>>>>> To avoid the confusion, I think we should use handlers for
>>>>> specific
>>>>>>>>>>>> exceptions.  Then if a new exception is added it won't get
>>>>> silently
>>>>>>>>>>> swalled
>>>>>>>>>>>> because the user would need to add new functionality to
>> handle
>>>> it.
>>>>>>>>>>>>
>>>>>>>>>>>> I also have some higher level comments:
>>>>>>>>>>>>
>>>>>>>>>>>> [AL2]
>>>>>>>>>>>>> it throws a TimeoutException, and the user can only blindly
>>>>> retry,
>>>>>>>>>> which
>>>>>>>>>>>> may result in an infinite retry loop
>>>>>>>>>>>>
>>>>>>>>>>>> If the TimeoutException happens during transactional
>>> processing
>>>>>>>>>> (exactly
>>>>>>>>>>>> once is the desired sematnics), then the client should not
>>> retry
>>>>>>>> when
>>>>>>>>>> it
>>>>>>>>>>>> gets TimeoutException because without knowing the reason for
>>>>>>>>>>>> TimeoutExceptions, the client cannot know whether the
>> message
>>>> got
>>>>>>>>>>> actually
>>>>>>>>>>>> produced or not and retrying the message may result in
>>>>> duplicatees.
>>>>>>>>>>>>
>>>>>>>>>>>>> The thrown TimeoutException "cuts" the connection to the
>>>>>>>> underlying
>>>>>>>>>> root
>>>>>>>>>>>> cause of missing metadata
>>>>>>>>>>>>
>>>>>>>>>>>> Maybe we should fix the error handling and return the proper
>>>>>>>>> underlying
>>>>>>>>>>>> message?  Then the application can properly handle the
>> message
>>>>>>>> based
>>>>>>>>> on
>>>>>>>>>>>> preferences.
>>>>>>>>>>>>
>>>>>>>>>>>> From the product perspective, it's not clear how safe it is
>> to
>>>>>>>>> blindly
>>>>>>>>>>>> ignore UnknownTopicOrPartitionException.  This could lead to
>>>>>>>>> situations
>>>>>>>>>>>> when a simple typo could lead to massive data loss (part of
>>> the
>>>>>>>> data
>>>>>>>>>>> would
>>>>>>>>>>>> effectively be produced to a "black hole" and the user may
>> not
>>>>>>>> notice
>>>>>>>>>> it
>>>>>>>>>>>> for a while).
>>>>>>>>>>>>
>>>>>>>>>>>> In which situations would you recommend the user to "black
>>> hole"
>>>>>>>>>> messages
>>>>>>>>>>>> in case of misconfiguration?
>>>>>>>>>>>>
>>>>>>>>>>>> [AL3]
>>>>>>>>>>>>
>>>>>>>>>>>>> If the custom handler decides on SWALLOW for
>>>>>>>>> RecordTooLargeException,
>>>>>>>>>>>>
>>>>>>>>>>>> Is it my understanding that this KIP proposes that
>>> functionality
>>>>>>>> that
>>>>>>>>>>> would
>>>>>>>>>>>> only be able to SWALLOW RecordTooLargeException that happen
>>>>> because
>>>>>>>>> the
>>>>>>>>>>>> producer cannot produce the record (if the broker rejects
>> the
>>>>>>>> batch,
>>>>>>>>>> the
>>>>>>>>>>>> error won't get to the handler, because we cannot know which
>>>> other
>>>>>>>>>>> records
>>>>>>>>>>>> get ignored).  In this case, why not just check the locally
>>>>>>>>> configured
>>>>>>>>>>> max
>>>>>>>>>>>> record size upfront and not produce the recrord in the first
>>>>> place?
>>>>>>>>>>> Maybe
>>>>>>>>>>>> we can expose a validation function from the producer that
>>> could
>>>>>>>>>> validate
>>>>>>>>>>>> the records locally, so we don't need to produce the record
>> in
>>>>>>>> order
>>>>>>>>> to
>>>>>>>>>>>> know that it's invalid.
>>>>>>>>>>>>
>>>>>>>>>>>> -Artem
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, May 7, 2024 at 2:07 PM Justine Olshan
>>>>>>>>>>> <jols...@confluent.io.invalid>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Alieh and Chris,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for clarifying 1) but I saw the motivation. I guess
>> I
>>>> just
>>>>>>>>>> didn't
>>>>>>>>>>>>> understand how that would be ensured on the producer side.
>> I
>>>> saw
>>>>>>>> the
>>>>>>>>>>> sample
>>>>>>>>>>>>> code -- is it just an if statement checking for the error
>>>> before
>>>>>>>> the
>>>>>>>>>>>>> handler is invoked? That seems a bit fragile.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Can you clarify what you mean by `since the code does not
>>> reach
>>>>>>>> the
>>>>>>>>> KS
>>>>>>>>>>>>> interface and breaks somewhere in producer.` If we surfaced
>>>> this
>>>>>>>>> error
>>>>>>>>>>> to
>>>>>>>>>>>>> the application in a better way would that also be a
>> solution
>>>> to
>>>>>>>> the
>>>>>>>>>>> issue?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Justine
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, May 7, 2024 at 1:55 PM Alieh Saeedi
>>>>>>>>>>> <asae...@confluent.io.invalid>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thank you, Chris and Justine, for the feedback.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> @Chris
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 1) Flexibility: it has two meanings. The first meaning is
>>> the
>>>>> one
>>>>>>>>> you
>>>>>>>>>>>>>> mentioned. We are going to cover more exceptions in the
>>>> future,
>>>>>>>> but
>>>>>>>>>> as
>>>>>>>>>>>>>> Justine mentioned, we must be very conservative about
>> adding
>>>>> more
>>>>>>>>>>>>>> exceptions. Additionally, flexibility mainly means that
>> the
>>>> user
>>>>>>>> is
>>>>>>>>>>> able
>>>>>>>>>>>>> to
>>>>>>>>>>>>>> develop their own code. As mentioned in the motivation
>>> section
>>>>>>>> and
>>>>>>>>>> the
>>>>>>>>>>>>>> examples, sometimes the user decides on dropping a record
>>>> based
>>>>>>>> on
>>>>>>>>>> the
>>>>>>>>>>>>>> topic, for example.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2) Defining two separate methods for retriable and
>>>> non-retriable
>>>>>>>>>>>>>> exceptions: although the idea is brilliant, the user may
>>> still
>>>>>>>>> make a
>>>>>>>>>>>>>> mistake by implementing the wrong method and see a
>>>> non-expecting
>>>>>>>>>>>>> behaviour.
>>>>>>>>>>>>>> For example, he may implement handleRetriable() for
>>>>>>>>>>>>> RecordTooLargeException
>>>>>>>>>>>>>> and define SWALLOW for the exception, but in practice, he
>>> sees
>>>>> no
>>>>>>>>>>> change
>>>>>>>>>>>>> in
>>>>>>>>>>>>>> default behaviour since he implemented the wrong method. I
>>>> think
>>>>>>>> we
>>>>>>>>>> can
>>>>>>>>>>>>>> never reduce the user’s mistakes to 0.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 3) Default implementation for Handler: the default
>> behaviour
>>>> is
>>>>>>>>>> already
>>>>>>>>>>>>>> preserved with NO need of implementing any handler or
>>> setting
>>>>> the
>>>>>>>>>>>>>> corresponding config parameter `custom.exception.handler`.
>>>> What
>>>>>>>> you
>>>>>>>>>>> mean
>>>>>>>>>>>>> is
>>>>>>>>>>>>>> actually having a second default, which requires having
>> both
>>>>>>>>>> interface
>>>>>>>>>>>>> and
>>>>>>>>>>>>>> config parameters. About UnknownTopicOrPartitionException:
>>> the
>>>>>>>>>> producer
>>>>>>>>>>>>>> already offers the config parameter `max.block.ms` which
>>>>>>>>> determines
>>>>>>>>>>> the
>>>>>>>>>>>>>> duration of retrying. The main purpose of the user who
>> needs
>>>> the
>>>>>>>>>>> handler
>>>>>>>>>>>>> is
>>>>>>>>>>>>>> to get the root cause of TimeoutException and handle it in
>>> the
>>>>>>>> way
>>>>>>>>> he
>>>>>>>>>>>>>> intends. The KIP explains the necessity of it for KS
>> users.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 4) Naming issue: By SWALLOW, we meant actually swallow the
>>>>> error,
>>>>>>>>>> while
>>>>>>>>>>>>>> SKIP means skip the record, I think. If it makes sense for
>>>> more
>>>>>>>>> ppl,
>>>>>>>>>> I
>>>>>>>>>>>>> can
>>>>>>>>>>>>>> change it to SKIP
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> @Justine
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 1) was addressed by Chris.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2 and 3) The problem is exactly what you mentioned.
>>> Currently,
>>>>>>>>> there
>>>>>>>>>> is
>>>>>>>>>>>>> no
>>>>>>>>>>>>>> way to handle these issues application-side. Even KS users
>>> who
>>>>>>>>>>> implement
>>>>>>>>>>>>> KS
>>>>>>>>>>>>>> ProductionExceptionHandler are not able to handle the
>>>> exceptions
>>>>>>>> as
>>>>>>>>>>> they
>>>>>>>>>>>>>> intend since the code does not reach the KS interface and
>>>> breaks
>>>>>>>>>>>>> somewhere
>>>>>>>>>>>>>> in producer.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>> Alieh
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Tue, May 7, 2024 at 8:43 PM Chris Egerton <
>>>>>>>>>> fearthecel...@gmail.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Justine,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The method signatures for the interface are indeed
>>>> open-ended,
>>>>>>>> but
>>>>>>>>>> the
>>>>>>>>>>>>>> KIP
>>>>>>>>>>>>>>> states that its uses will be limited. See the motivation
>>>>>>>> section:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> We believe that the user should be able to develop
>> custom
>>>>>>>>> exception
>>>>>>>>>>>>>>> handlers for managing producer exceptions. On the other
>>> hand,
>>>>>>>> this
>>>>>>>>>>> will
>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>> an expert-level API, and using that may result in strange
>>>>>>>>> behaviour
>>>>>>>>>> in
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> system, making it hard to find the root cause. Therefore,
>>> the
>>>>>>>>> custom
>>>>>>>>>>>>>>> handler is currently limited to handling
>>>>> RecordTooLargeException
>>>>>>>>> and
>>>>>>>>>>>>>>> UnknownTopicOrPartitionException.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Chris
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Tue, May 7, 2024, 14:37 Justine Olshan
>>>>>>>>>>> <jols...@confluent.io.invalid
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Alieh,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I was out for KSB and then was also sick. :(
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> To your point 1) Chris, I don't think it is limited to
>> two
>>>>>>>>> specific
>>>>>>>>>>>>>>>> scenarios, since the interface accepts a generic
>>> Exception e
>>>>>>>> and
>>>>>>>>>> can
>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>> implemented to check if that e is an instanceof any
>>>> exception.
>>>>>>>> I
>>>>>>>>>>>>> didn't
>>>>>>>>>>>>>>> see
>>>>>>>>>>>>>>>> anywhere that specific errors are enforced. I'm a bit
>>>>> concerned
>>>>>>>>>> about
>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>> actually. I'm concerned about the opened-endedness and
>> the
>>>>>>>>> contract
>>>>>>>>>>>>> we
>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>> with transactions. We are allowing the client to make
>>>>> decisions
>>>>>>>>>> that
>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>> somewhat invisible to the server. As an aside, can we
>>> build
>>>> in
>>>>>>>>> log
>>>>>>>>>>>>>>> messages
>>>>>>>>>>>>>>>> when the handler decides to skip etc a message. I'm
>> really
>>>>>>>>>> concerned
>>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>>> messages being silently dropped.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I do think Chris's point 2) about retriable vs non
>>> retriable
>>>>>>>>> errors
>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>> fair. I'm a bit concerned about skipping a unknown topic
>>> or
>>>>>>>>>> partition
>>>>>>>>>>>>>>>> exception too early, as there are cases where it can be
>>>>>>>>> transient.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I'm still a little bit wary of allowing dropping records
>>> as
>>>>>>>> part
>>>>>>>>> of
>>>>>>>>>>>>> EOS
>>>>>>>>>>>>>>>> generally as in many cases, these errors signify an
>> issue
>>>> with
>>>>>>>>> the
>>>>>>>>>>>>>>> original
>>>>>>>>>>>>>>>> data. I understand that streams and connect/mirror maker
>>> may
>>>>>>>> have
>>>>>>>>>>>>>> reasons
>>>>>>>>>>>>>>>> they want to progress past these messages, but wondering
>>> if
>>>>>>>> there
>>>>>>>>>> is
>>>>>>>>>>>>> a
>>>>>>>>>>>>>>> way
>>>>>>>>>>>>>>>> that can be done application-side. I'm willing to accept
>>>> this
>>>>>>>>> sort
>>>>>>>>>> of
>>>>>>>>>>>>>>>> proposal if we can make it clear that this sort of thing
>>> is
>>>>>>>>>> happening
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>> we limit the blast radius for what we can do.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Justine
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Tue, May 7, 2024 at 9:55 AM Chris Egerton
>>>>>>>>>> <chr...@aiven.io.invalid
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi Alieh,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Sorry for the delay, I've been out sick. I still have
>>> some
>>>>>>>>>> thoughts
>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>> I'd like to see addressed before voting.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 1) If flexibility is the motivation for a pluggable
>>>>> interface,
>>>>>>>>> why
>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>> only limiting the uses for this interface to two very
>>>>> specific
>>>>>>>>>>>>>>> scenarios?
>>>>>>>>>>>>>>>>> Why not also allow, e.g., authorization errors to be
>>>> handled
>>>>>>>> as
>>>>>>>>>>>>> well
>>>>>>>>>>>>>>>>> (allowing users to drop records destined for some
>>>> off-limits
>>>>>>>>>>>>> topics,
>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>> retry for a limited duration in case there's a delay in
>>> the
>>>>>>>>>>>>>> propagation
>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>> ACL updates)? It'd be nice to see some analysis of
>> other
>>>>>>>> errors
>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>> could
>>>>>>>>>>>>>>>>> be handled with this new API, both to avoid the
>> follow-up
>>>>> work
>>>>>>>>> of
>>>>>>>>>>>>>>> another
>>>>>>>>>>>>>>>>> KIP to address them in the future, and to make sure
>> that
>>>>> we're
>>>>>>>>> not
>>>>>>>>>>>>>>>> painting
>>>>>>>>>>>>>>>>> ourselves into a corner with the current API in a way
>>> that
>>>>>>>> would
>>>>>>>>>>>>> make
>>>>>>>>>>>>>>>>> future modifications difficult.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 2) Something feels a bit off with how retriable vs.
>>>>>>>>> non-retriable
>>>>>>>>>>>>>>> errors
>>>>>>>>>>>>>>>>> are handled with the interface. Why not introduce two
>>>>> separate
>>>>>>>>>>>>>> methods
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> handle each case separately? That way there's no
>>> ambiguity
>>>> or
>>>>>>>>>>>>>> implicit
>>>>>>>>>>>>>>>>> behavior when, e.g., attempting to retry on a
>>>>>>>>>>>>>> RecordTooLargeException.
>>>>>>>>>>>>>>>> This
>>>>>>>>>>>>>>>>> could be something like `NonRetriableResponse
>>>>>>>>>>>>> handle(ProducerRecord,
>>>>>>>>>>>>>>>>> Exception)` and `RetriableResponse
>>>>>>>>> handleRetriable(ProducerRecord,
>>>>>>>>>>>>>>>>> Exception)`, though the exact names and shape can
>>> obviously
>>>>> be
>>>>>>>>>>>>> toyed
>>>>>>>>>>>>>>>> with a
>>>>>>>>>>>>>>>>> bit.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 3) Although the flexibility of a pluggable interface
>> may
>>>>>>>> benefit
>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>> users' custom producer applications and Kafka Streams
>>>>>>>>>> applications,
>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>> comes at significant deployment cost for other
>>> low-/no-code
>>>>>>>>>>>>>>> environments,
>>>>>>>>>>>>>>>>> including but not limited to Kafka Connect and
>>> MirrorMaker
>>>> 2.
>>>>>>>>> Can
>>>>>>>>>>>>> we
>>>>>>>>>>>>>>> add
>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>> default implementation of the exception handler that
>>> allows
>>>>>>>> for
>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>> simple
>>>>>>>>>>>>>>>>> behavior to be tweaked via configuration property? Two
>>>> things
>>>>>>>>> that
>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>> nice to have would be A) an upper bound on the retry
>> time
>>>> for
>>>>>>>>>>>>>>>>> unknown-topic-partition exceptions and B) an option to
>>> drop
>>>>>>>>>> records
>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>> are large enough to trigger a record-too-large
>> exception.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 4) I'd still prefer to see "SKIP" or "DROP" instead of
>>> the
>>>>>>>>>> proposed
>>>>>>>>>>>>>>>>> "SWALLOW" option, which IMO is opaque and non-obvious,
>>>>>>>>> especially
>>>>>>>>>>>>>> when
>>>>>>>>>>>>>>>>> trying to guess the behavior for retriable errors.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Chris
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Fri, May 3, 2024 at 11:23 AM Alieh Saeedi
>>>>>>>>>>>>>>>> <asae...@confluent.io.invalid
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> A summary of the KIP and the discussions:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> The KIP introduces a handler interface for Producer in
>>>> order
>>>>>>>> to
>>>>>>>>>>>>>>> handle
>>>>>>>>>>>>>>>>> two
>>>>>>>>>>>>>>>>>> exceptions: RecordTooLargeException and
>>>>>>>>>>>>>>>> UnknownTopicOrPartitionException.
>>>>>>>>>>>>>>>>>> The handler handles the exceptions per-record.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> - Do we need this handler?  [Motivation and Examples
>>>>>>>> sections]
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> RecordTooLargeException: 1) In transactions, the
>>> producer
>>>>>>>>>>>>> collects
>>>>>>>>>>>>>>>>> multiple
>>>>>>>>>>>>>>>>>> records in batches. Then a RecordTooLargeException
>>> related
>>>>>>>> to a
>>>>>>>>>>>>>>> single
>>>>>>>>>>>>>>>>>> record leads to failing the entire batch. A custom
>>>> exception
>>>>>>>>>>>>>> handler
>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>> this case may decide on dropping the record and
>>> continuing
>>>>>>>> the
>>>>>>>>>>>>>>>>> processing.
>>>>>>>>>>>>>>>>>> See Example 1, please. 2) More over, in Kafka
>> Streams, a
>>>>>>>> record
>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>> too
>>>>>>>>>>>>>>>>>> large is a poison pill record, and there is no way to
>>> skip
>>>>>>>> over
>>>>>>>>>>>>>> it. A
>>>>>>>>>>>>>>>>>> handler would allow us to react to this error inside
>> the
>>>>>>>>>>>>> producer,
>>>>>>>>>>>>>>>> i.e.,
>>>>>>>>>>>>>>>>>> local to where the error happens, and thus simplify
>> the
>>>>>>>> overall
>>>>>>>>>>>>>> code
>>>>>>>>>>>>>>>>>> significantly. Please read the Motivation section for
>>> more
>>>>>>>>>>>>>>> explanation.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> UnknownTopicOrPartitionException: For this case, the
>>>>> producer
>>>>>>>>>>>>>> handles
>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>> exception internally and only issues a WARN log about
>>>>> missing
>>>>>>>>>>>>>>> metadata
>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>> retries internally. Later, when the producer hits "
>>>>>>>>>>>>>>> deliver.timeout.ms"
>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>> throws a TimeoutException, and the user can only
>> blindly
>>>>>>>> retry,
>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>> may
>>>>>>>>>>>>>>>>>> result in an infinite retry loop. The thrown
>>>>> TimeoutException
>>>>>>>>>>>>>> "cuts"
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> connection to the underlying root cause of missing
>>>> metadata
>>>>>>>>>>>>> (which
>>>>>>>>>>>>>>>> could
>>>>>>>>>>>>>>>>>> indeed be a transient error but is persistent for a
>>>>>>>>> non-existing
>>>>>>>>>>>>>>>> topic).
>>>>>>>>>>>>>>>>>> Thus, there is no programmatic way to break the
>> infinite
>>>>>>>> retry
>>>>>>>>>>>>>> loop.
>>>>>>>>>>>>>>>>> Kafka
>>>>>>>>>>>>>>>>>> Streams also blindly retries for this case, and the
>>>>>>>> application
>>>>>>>>>>>>>> gets
>>>>>>>>>>>>>>>>> stuck.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> - Having interface vs configuration option:
>> [Motivation,
>>>>>>>>>>>>> Examples,
>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>> Rejected Alternatives sections]
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Our solution is introducing an interface due to the
>> full
>>>>>>>>>>>>>> flexibility
>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>> it offers. Sometimes users, especially Kafka Streams
>>> ones,
>>>>>>>>>>>>>> determine
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> handler's behaviour based on the situation. For
>>> example, f
>>>>>>>>>>>>>>>>>> acing UnknownTopicOrPartitionException*, *the user may
>>>> want
>>>>>>>> to
>>>>>>>>>>>>>> raise
>>>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>>> error for some topics but retry it for other topics.
>>>> Having
>>>>> a
>>>>>>>>>>>>>>>>> configuration
>>>>>>>>>>>>>>>>>> option with a fixed set of possibilities does not
>> serve
>>>> the
>>>>>>>>>>>>> user's
>>>>>>>>>>>>>>>>>> needs. See Example 2, please.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> - Note on RecordTooLargeException: [Public Interfaces
>>>>>>>> section]
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> If the custom handler decides on SWALLOW for
>>>>>>>>>>>>>> RecordTooLargeException,
>>>>>>>>>>>>>>>>> then
>>>>>>>>>>>>>>>>>> this record will not be a part of the batch of
>>>> transactions
>>>>>>>> and
>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>> also
>>>>>>>>>>>>>>>>>> not be sent to the broker in non-transactional mode.
>> So
>>> no
>>>>>>>>>>>>> worries
>>>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>>>>> getting a RecordTooLargeException from the broker in
>>> this
>>>>>>>> case,
>>>>>>>>>>>>> as
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> record will never ever be sent to the broker. SWALLOW
>>>> means
>>>>>>>>> drop
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> record
>>>>>>>>>>>>>>>>>> and continue/swallow the error.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> - What if the handle() method implements RETRY for
>>>>>>>>>>>>>>>>> RecordTooLargeException?
>>>>>>>>>>>>>>>>>> [Proposed Changes section]
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> We have to limit the user to only have FAIL or SWALLOW
>>> for
>>>>>>>>>>>>>>>>>> RecordTooLargeException. Actually, RETRY must be equal
>>> to
>>>>>>>> FAIL.
>>>>>>>>>>>>>> This
>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>> well documented/informed in javadoc.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> - What if the handle() method of the handler throws an
>>>>>>>>> exception?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> The handler is expected to have correct code. If it
>>> throws
>>>>> an
>>>>>>>>>>>>>>>> exception,
>>>>>>>>>>>>>>>>>> everything fails.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> This is a PoC PR <
>>>>> https://github.com/apache/kafka/pull/15846
>>>>>>>>>
>>>>>>>>>>>>> ONLY
>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>> RecordTooLargeException. The code changes related to
>>>>>>>>>>>>>>>>>> UnknownTopicOrPartitionException will be added to this
>>> PR
>>>>>>>>> LATER.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Looking forward to your feedback again.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Alieh
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Thu, Apr 25, 2024 at 11:46 PM Kirk True <
>>>>>>>> k...@kirktrue.pro>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi Alieh,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks for the updates!
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Comments inline...
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Apr 25, 2024, at 1:10 PM, Alieh Saeedi
>>>>>>>>>>>>>>>>> <asae...@confluent.io.INVALID
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks a lot for the constructive feedbacks!
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Addressing some of the main concerns:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> - The `RecordTooLargeException` can be thrown by
>>> broker,
>>>>>>>>>>>>>> producer
>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>> consumer. Of course, the `ProducerExceptionHandler`
>>>>>>>> interface
>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>> introduced
>>>>>>>>>>>>>>>>>>>> to affect only the exceptions thrown from the
>>> producer.
>>>>>>>> This
>>>>>>>>>>>>>> KIP
>>>>>>>>>>>>>>>> very
>>>>>>>>>>>>>>>>>>>> specifically means to provide a possibility to
>> manage
>>>> the
>>>>>>>>>>>>>>>>>>>> `RecordTooLargeException` thrown from the
>>>> Producer.send()
>>>>>>>>>>>>>> method.
>>>>>>>>>>>>>>>>>> Please
>>>>>>>>>>>>>>>>>>>> see “Proposed Changes” section for more clarity. I
>>>>>>>>>>>>> investigated
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> issue
>>>>>>>>>>>>>>>>>>>> there thoroughly. I hope it can explain the concern
>>>> about
>>>>>>>> how
>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>> handle
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> errors as well.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> - The problem with Callback: Methods of Callback are
>>>>> called
>>>>>>>>>>>>>> when
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> record
>>>>>>>>>>>>>>>>>>>> sent to the server is acknowledged, while this is
>> not
>>>> the
>>>>>>>>>>>>>> desired
>>>>>>>>>>>>>>>>> time
>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>> all exceptions. We intend to handle exceptions
>>>> beforehand.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I guess it makes sense to keep the expectation for
>> when
>>>>>>>>>>>>> Callback
>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>> invoked as-is vs. shoehorning more into it.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> - What if the custom handler returns RETRY for
>>>>>>>>>>>>>>>>>>> `RecordTooLargeException`? I
>>>>>>>>>>>>>>>>>>>> assume changing the producer configuration at
>> runtime
>>> is
>>>>>>>>>>>>>>> possible.
>>>>>>>>>>>>>>>> If
>>>>>>>>>>>>>>>>>> so,
>>>>>>>>>>>>>>>>>>>> RETRY for a too large record is valid because maybe
>> in
>>>> the
>>>>>>>>>>>>> next
>>>>>>>>>>>>>>>> try,
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> too large record is not poisoning any more. I am not
>>>> 100%
>>>>>>>>>>>>> sure
>>>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> technical details, though. Otherwise, we can
>> consider
>>>> the
>>>>>>>>>>>>> RETRY
>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>> FAIL
>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>> this exception. Another solution would be to
>> consider
>>> a
>>>>>>>>>>>>>> constant
>>>>>>>>>>>>>>>>> number
>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>> times for RETRY which can be useful for other
>>> exceptions
>>>>> as
>>>>>>>>>>>>>> well.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> It’s not presently possible to change the
>> configuration
>>>> of
>>>>>>>> an
>>>>>>>>>>>>>>>> existing
>>>>>>>>>>>>>>>>>>> Producer at runtime. So if a record hits a
>>>>>>>>>>>>>> RecordTooLargeException
>>>>>>>>>>>>>>>>> once,
>>>>>>>>>>>>>>>>>> no
>>>>>>>>>>>>>>>>>>> amount of retrying (with the current Producer) will
>>>> change
>>>>>>>>> that
>>>>>>>>>>>>>>> fact.
>>>>>>>>>>>>>>>>> So
>>>>>>>>>>>>>>>>>>> I’m still a little stuck on how to handle a response
>> of
>>>>>>>> RETRY
>>>>>>>>>>>>> for
>>>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>>>> “oversized” record.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> - What if the handle() method itself throws an
>>>> exception?
>>>>> I
>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>>>>> rationally and pragmatically, the behaviour must be
>>>>> exactly
>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>>> when
>>>>>>>>>>>>>>>>>> no
>>>>>>>>>>>>>>>>>>>> custom handler is defined since the user actually
>> did
>>>> not
>>>>>>>>>>>>> have
>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>> working
>>>>>>>>>>>>>>>>>>>> handler.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I’m not convinced that ignoring an errant handler is
>>> the
>>>>>>>> right
>>>>>>>>>>>>>>>> choice.
>>>>>>>>>>>>>>>>> It
>>>>>>>>>>>>>>>>>>> then becomes a silent failure that might have
>>>>> repercussions,
>>>>>>>>>>>>>>>> depending
>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>> the business logic. A user would have to proactively
>>>> trawls
>>>>>>>>>>>>>> through
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> logs for WARN/ERROR messages to catch it.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Throwing a hard error is pretty draconian, though…
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> - Why not use config parameters instead of an
>>> interface?
>>>>> As
>>>>>>>>>>>>>>>> explained
>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>> the “Rejected Alternatives” section, we assume that
>>> the
>>>>>>>>>>>>> handler
>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>> used for a greater number of exceptions in the
>> future.
>>>>>>>>>>>>>> Defining a
>>>>>>>>>>>>>>>>>>>> configuration parameter for each exception may make
>>> the
>>>>>>>>>>>>>>>>> configuration a
>>>>>>>>>>>>>>>>>>> bit
>>>>>>>>>>>>>>>>>>>> messy. Moreover, the handler offers more
>> flexibility.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Agreed that the logic-via-configuration approach is
>>> weird
>>>>>>>> and
>>>>>>>>>>>>>>>> limiting.
>>>>>>>>>>>>>>>>>>> Forget I ever suggested it ;)
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I’d think additional background in the Motivation
>>> section
>>>>>>>>> would
>>>>>>>>>>>>>>> help
>>>>>>>>>>>>>>>> me
>>>>>>>>>>>>>>>>>>> understand how users might use this feature beyond a)
>>>>>>>> skipping
>>>>>>>>>>>>>>>>>> “oversized”
>>>>>>>>>>>>>>>>>>> records, and b) not retrying missing topics.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Small change:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> -ProductionExceptionHandlerResponse -> Response for
>>>>> brevity
>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>> simplicity.
>>>>>>>>>>>>>>>>>>>> Could’ve been HandlerResponse too I think!
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> The name change sounds good to me.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks Alieh!
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I thank you all again for your useful
>>>>>>>> questions/suggestions.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I would be happy to hear more of your concerns, as
>>>> stated
>>>>>>>> in
>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>>> feedback.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>> Alieh
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Wed, Apr 24, 2024 at 12:31 AM Justine Olshan
>>>>>>>>>>>>>>>>>>>> <jols...@confluent.io.invalid> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thanks Alieh for the updates.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I'm a little concerned about the design pattern
>> here.
>>>> It
>>>>>>>>>>>>> seems
>>>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>> want
>>>>>>>>>>>>>>>>>>>>> specific usages, but we are packaging it as a
>> generic
>>>>>>>>>>>>> handler.
>>>>>>>>>>>>>>>>>>>>> I think we tried to narrow down on the specific
>>> errors
>>>> we
>>>>>>>>>>>>> want
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> handle,
>>>>>>>>>>>>>>>>>>>>> but it feels a little clunky as we have a generic
>>> thing
>>>>>>>> for
>>>>>>>>>>>>>> two
>>>>>>>>>>>>>>>>>> specific
>>>>>>>>>>>>>>>>>>>>> errors.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I'm wondering if we are using the right patterns to
>>>> solve
>>>>>>>>>>>>>> these
>>>>>>>>>>>>>>>>>>> problems. I
>>>>>>>>>>>>>>>>>>>>> agree though that we will need something more than
>>> the
>>>>>>>> error
>>>>>>>>>>>>>>>> classes
>>>>>>>>>>>>>>>>>> I'm
>>>>>>>>>>>>>>>>>>>>> proposing if we want to have different handling be
>>>>>>>>>>>>>> configurable.
>>>>>>>>>>>>>>>>>>>>> My concern is that the open-endedness of a handler
>>>> means
>>>>>>>>>>>>> that
>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>> creating more problems than we are solving. It is
>>> still
>>>>>>>>>>>>>> unclear
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> me
>>>>>>>>>>>>>>>>>>> how
>>>>>>>>>>>>>>>>>>>>> we expect to handle the errors. Perhaps we could
>>>> include
>>>>>>>> an
>>>>>>>>>>>>>>>> example?
>>>>>>>>>>>>>>>>>> It
>>>>>>>>>>>>>>>>>>>>> seems like there is a specific use case in mind and
>>>> maybe
>>>>>>>> we
>>>>>>>>>>>>>> can


Reply via email to