loicgreffier commented on code in PR #16093:
URL: https://github.com/apache/kafka/pull/16093#discussion_r1666045522


##########
streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java:
##########
@@ -219,6 +222,13 @@ public TopologyConfig(final String topologyName, final 
StreamsConfig globalAppCo
             timestampExtractorSupplier = () -> 
globalAppConfigs.getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
 TimestampExtractor.class);
         }
 
+        if (isTopologyOverride(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, 
topologyOverrides)) {
+            processingExceptionHandlerSupplier = 
getClass(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG);
+            log.info("Topology {} is overriding {} to {}", topologyName, 
PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, 
getClass(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG));
+        } else {
+            processingExceptionHandlerSupplier = 
globalAppConfigs.getClass(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG);
+        }

Review Comment:
   @cadonna Sorry, that was my mistake. I updated the PR to invoke 
`getConfiguredInstance`.
   
   If I remember well, during the discussions around KIP-1033, we rule out the 
possibility to override the processing exception handler from the topology 
config (which was the reason why the config is not called **DEFAULT_** 
...PROCESSING_EXCEPTION_HANDLER). Considering this, I remove the condition 
`isTopologyOverride` for the processing exception handler.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to