Hi All, I have been recently working on a streams application that uses a TopicNameExtractor to dynamically route records based on the payload. This streams application is used by various other applications, and occasionally these other applications request for a record to be sent to a non-existent topic - rather than this topic be created, the message should be logged and dropped.
Unfortunately, I don't seem to have found a good way to implement this behaviour in a reliable way: I originally hoped to be able to catch these scenarios in a ProductionExceptionHandler by catching an UnknownTopicOrPartitionError, however the current producer behaviour is to wait for max.block.ms in waitOnMetadata() for partitions to be returned for the topic before throwing a TimeoutException. If after refreshing metadata, there are still no partitions for the requested topic, it will continue to request an update until the timeout is reached: ( https://github.com/apache/kafka/blob/b8a99be7847c61d7792689b71fda5b283f8340a8/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1051 ) For my use case, there are two challenges here: 1. ProductionExceptionHandler must catch TimeoutException and inspect the message to determine that the exception was caused by not finding the topic in the metadata 2. The streams task blocks (as expected) while the producer is fetching metadata, holding up processing of other records, until the timeout exception is thrown. Rather than accept the stream blocking in this scenario, my current thinking is to use AdminClient to keep a cache of existing/nonexisting topics periodically updated and filter based on this - however i can't stop thinking that this feels clunky, given the producer maintains its own cache of recently accessed topics/partitions. Would it make sense to enhance KafkaProducer to: - Optionally fail fast when the first metadata refresh does not return the requested topic, or partition count? (And maybe even optionally cache this?) - Differentiate between a TimeoutException and UnknownTopicOrPartitionError? My understanding of the internals isn't great - I'm not clear on the reason to continue to request metadata updates after getting a new version - is there a possible issue with getting stale metadata from brokers? Looking forward to your thoughts!