Guozhang - thank you for your thoughts.

You are right - this is more about the producer client than the streams
client.

caching the metadata outside producer, e.g. in an admin client would not
> be a perfect solution since in either way your metadata cache inside the
> producer or inside the admin client would not guarantee to be always up to
> date


It's not perfect, but fortunately in my case it's "good enough" as the main
concern is not letting poorly behaved clients hold up the processing
because they forgot to set up the topics their records should be sent to.

letting the send() call to fail with an UnknownTopicOrPartitionError and
> push the burden on the caller to decide what to do (either wait and retry,
> or give up and stop the world etc) may work, but that requires modifying
> the interface semantics, or at least adding an overloaded function of
> "send()". Maybe worth discussing in a KIP.


The more I think about it, the more I like the idea of differentiating
between a metadata refresh timeout and the case where the metadata was able
to be refreshed yet still didn't contain the topic (or partition). I'll
take a bit more of a look at the existing implementation and try to find
some time to write a KIP for this - as you pointed out this modifies the
interface semantics, so it would need to be an additive and opt in change.

since max.block is a
global config it may also affect other blocking calls like txn-related ones
as well.


Yes that was my concern with this approach as well and hence why i think
the admin client workaround is my best approach at the moment.

Cheers,
Rhys

On Sat, Jul 4, 2020 at 8:36 PM Guozhang Wang <wangg...@gmail.com> wrote:

> Hello,
>
> Thanks for reaching out to the community for this. I think (maybe you've
> also suggested) it is rather an observation on producer client than on
> streams client. Generally speaking we want to know if we can fail fast if
> the metadata cannot be found in producer.send() call. And here are my
> thoughts:
>
> 1) caching the metadata outside producer, e.g. in an admin client would not
> be a perfect solution since in either way your metadata cache inside the
> producer or inside the admin client would not guarantee to be always up to
> date: e.g. maybe you've decided to fail the record to send since it was not
> in the cache, but one second right after it the metadata gets refreshed and
> contains that topic.
>
> 2) letting the send() call to fail with an UnknownTopicOrPartitionError and
> push the burden on the caller to decide what to do (either wait and retry,
> or give up and stop the world etc) may work, but that requires modifying
> the interface semantics, or at least adding an overloaded function of
> "send()". Maybe worth discussing in a KIP.
>
> 3) for your specific case, if you believe the metadata should be static and
> not changed (i.e. you assume all topics should be pre-created and none
> would be created later), then I think setting max.block to a smaller value
> and just catch TimeoutException is fine since for send() itself, the
> max.block is only used for metadata refresh and buffer allocation when it
> is not sufficient, and the latter should be rare case assuming you set the
> buffer.size to be reasonably large. But note that since max.block is a
> global config it may also affect other blocking calls like txn-related ones
> as well.
>
>
> On Wed, Jul 1, 2020 at 6:10 PM Rhys Anthony McCaig <mcc...@gmail.com>
> wrote:
>
> > Hi All,
> >
> > I have been recently working on a streams application that uses a
> > TopicNameExtractor to dynamically route records based on the payload.
> This
> > streams application is used by various other applications, and
> occasionally
> > these other applications request for a record to be sent to a
> non-existent
> > topic - rather than this topic be created, the message should be logged
> and
> > dropped.
> >
> > Unfortunately, I don't seem to have found a good way to implement this
> > behaviour in a reliable way: I originally hoped to be able to catch these
> > scenarios in a ProductionExceptionHandler by catching an
> > UnknownTopicOrPartitionError, however the current producer behaviour is
> to
> > wait for max.block.ms in waitOnMetadata() for partitions to be returned
> > for
> > the topic before throwing a TimeoutException. If after refreshing
> metadata,
> > there are still no partitions for the requested topic, it will continue
> to
> > request an update until the timeout is reached:  (
> >
> >
> https://github.com/apache/kafka/blob/b8a99be7847c61d7792689b71fda5b283f8340a8/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1051
> > )
> > For my use case, there are two challenges here:
> > 1. ProductionExceptionHandler must catch TimeoutException and inspect the
> > message to determine that the exception was caused by not finding the
> topic
> > in the metadata
> > 2. The streams task blocks (as expected) while the producer is fetching
> > metadata, holding up processing of other records, until the timeout
> > exception is thrown.
> >
> > Rather than accept the stream blocking in this scenario, my current
> > thinking is to use AdminClient to keep a cache of existing/nonexisting
> > topics periodically updated and filter based on this - however i can't
> stop
> > thinking that this feels clunky, given the producer maintains its own
> cache
> > of recently accessed topics/partitions.
> >
> > Would it make sense to enhance KafkaProducer to:
> > - Optionally fail fast when the first metadata refresh does not return
> the
> > requested topic, or partition count? (And maybe even optionally cache
> > this?)
> > - Differentiate between a TimeoutException and
> > UnknownTopicOrPartitionError?
> >
> > My understanding of the internals isn't great - I'm not clear on the
> reason
> > to continue to request metadata updates after getting a new version - is
> > there a possible issue with getting stale metadata from brokers?
> >
> > Looking forward to your thoughts!
> >
>
>
> --
> -- Guozhang
>

Reply via email to