C0urante commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1091084063


##########
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##########
@@ -476,14 +480,34 @@ public <T> List<T> getConfiguredInstances(List<String> 
classNames, Class<T> t, M
             return objects;
         Map<String, Object> configPairs = originals();
         configPairs.putAll(configOverrides);
-        for (Object klass : classNames) {
-            Object o = getConfiguredInstance(klass, t, configPairs);
-            objects.add(t.cast(o));
+
+        try {
+            for (Object klass : classNames) {
+                Object o = getConfiguredInstance(klass, t, configPairs);
+                objects.add(t.cast(o));
+            }
+        } catch (Exception e) {
+            for (Object object : objects) {
+                if (object instanceof AutoCloseable) {
+                    try {
+                        ((AutoCloseable) object).close();
+                    } catch (Exception ex) {
+                        log.error(String.format("Close exception on %s", 
object.getClass().getName()), ex);
+                    }
+                } else if (object instanceof Closeable) {
+                    try {
+                        ((Closeable) object).close();
+                    } catch (Exception ex) {
+                        log.error(String.format("Close exception on %s", 
object.getClass().getName()), ex);
+                    }

Review Comment:
   This can be simplified a bit:
   
   ```suggestion
                       Utils.closeQuietly((AutoCloseable) object, 
"AutoCloseable object constructed and configured during failed call to 
getConfiguredInstances");
   
   ```
   
   1. `Utils::closeQuietly` handles failures for us
   2. `Closeable` is a subinterface of `AutoCloseable`, so we only need to 
check for the latter



##########
clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java:
##########
@@ -55,6 +58,11 @@ public void configure(Map<String, ?> configs) {
         Object clientIdValue = configs.get(ConsumerConfig.CLIENT_ID_CONFIG);
         if (clientIdValue == null)
             throw new ConfigException("Mock consumer interceptor expects 
configuration " + ProducerConfig.CLIENT_ID_CONFIG);
+
+        CONFIG_COUNT.incrementAndGet();
+        if (CONFIG_COUNT.get() == THROW_CONFIG_EXCEPTION_THRESHOLD.get()) {
+            throw new ConfigException("Kafka producer creation failed. Failure 
may not have cleaned up listener thread resource.");

Review Comment:
   It seems like the failure message here is hinting that we try to create a 
Kafka producer in this interceptor, but there isn't much else in the class to 
go along with that.
   
   Could we use a more generic message like "Failed to instantiate interceptor 
(reached throw-on-config threshold)"?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -503,6 +505,30 @@ public void testInterceptorConstructorClose() {
         }
     }
 
+    @Test
+    public void 
testInterceptorConstructorConfigurationWithExceptionShouldCloseRemainingInstances()
 {

Review Comment:
   I like this test. Could we also get a matching one for producer 
interceptors, and something analogous for the `AbstractConfig` class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to