Hi All,

I have been recently working on a streams application that uses a
TopicNameExtractor to dynamically route records based on the payload. This
streams application is used by various other applications, and occasionally
these other applications request for a record to be sent to a non-existent
topic - rather than this topic be created, the message should be logged and
dropped.

Unfortunately, I don't seem to have found a good way to implement this
behaviour in a reliable way: I originally hoped to be able to catch these
scenarios in a ProductionExceptionHandler by catching an
UnknownTopicOrPartitionError, however the current producer behaviour is to
wait for max.block.ms in waitOnMetadata() for partitions to be returned for
the topic before throwing a TimeoutException. If after refreshing metadata,
there are still no partitions for the requested topic, it will continue to
request an update until the timeout is reached:  (
https://github.com/apache/kafka/blob/b8a99be7847c61d7792689b71fda5b283f8340a8/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1051
)
For my use case, there are two challenges here:
1. ProductionExceptionHandler must catch TimeoutException and inspect the
message to determine that the exception was caused by not finding the topic
in the metadata
2. The streams task blocks (as expected) while the producer is fetching
metadata, holding up processing of other records, until the timeout
exception is thrown.

Rather than accept the stream blocking in this scenario, my current
thinking is to use AdminClient to keep a cache of existing/nonexisting
topics periodically updated and filter based on this - however i can't stop
thinking that this feels clunky, given the producer maintains its own cache
of recently accessed topics/partitions.

Would it make sense to enhance KafkaProducer to:
- Optionally fail fast when the first metadata refresh does not return the
requested topic, or partition count? (And maybe even optionally cache this?)
- Differentiate between a TimeoutException and UnknownTopicOrPartitionError?

My understanding of the internals isn't great - I'm not clear on the reason
to continue to request metadata updates after getting a new version - is
there a possible issue with getting stale metadata from brokers?

Looking forward to your thoughts!

Reply via email to