loicgreffier commented on code in PR #16093: URL: https://github.com/apache/kafka/pull/16093#discussion_r1666045522
########## streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java: ########## @@ -219,6 +222,13 @@ public TopologyConfig(final String topologyName, final StreamsConfig globalAppCo timestampExtractorSupplier = () -> globalAppConfigs.getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class); } + if (isTopologyOverride(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, topologyOverrides)) { + processingExceptionHandlerSupplier = getClass(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG); + log.info("Topology {} is overriding {} to {}", topologyName, PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, getClass(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG)); + } else { + processingExceptionHandlerSupplier = globalAppConfigs.getClass(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG); + } Review Comment: @cadonna Sorry, that was my mistake. I updated the PR to invoke `getConfiguredInstance`. If I remember well, during the discussions around KIP-1033, we rule out the possibility to override the processing exception handler from the topology config (which was the reason why the config is not called **DEFAULT_** ...PROCESSING_EXCEPTION_HANDLER). Considering this, I remove the condition `isTopologyOverride` for the processing exception handler. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org