[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

2023-02-15 Thread via GitHub


beardt commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1107962390


##
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##
@@ -389,22 +389,27 @@ public void logUnused() {
 private  T getConfiguredInstance(Object klass, Class t, Map configPairs) {
 if (klass == null)
 return null;
+Object o = null;
+try {
+if (klass instanceof String) {
+try {
+o = Utils.newInstance((String) klass, t);
+} catch (ClassNotFoundException e) {
+throw new KafkaException("Class " + klass + " cannot be 
found", e);
+}
+} else if (klass instanceof Class) {
+o = Utils.newInstance((Class) klass);
+} else
+throw new KafkaException("Unexpected element of type " + 
klass.getClass().getName() + ", expected String or Class");
+if (!t.isInstance(o))
+throw new KafkaException(klass + " is not an instance of " + 
t.getName());
+if (o instanceof Configurable)
+((Configurable) o).configure(configPairs);
+} catch (Exception e) {
+maybeClose(o, "AutoCloseable object constructed and configured 
during failed call to getConfiguredInstance");
+throw e;
+}
 

Review Comment:
   @C0urante: I removed all extra blank lines `getConfiguredInstance`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

2023-02-15 Thread via GitHub


beardt commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1107961671


##
clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java:
##
@@ -599,6 +624,31 @@ public TestConfig(Map props) {
 }
 }
 
+private static class InterceptorTestConfig extends AbstractConfig {
+private final int targetInterceptor = 3;
+private static final ConfigDef CONFIG;
+private static final String INTERCEPTOR_CLASSES_CONFIG_DOC = "A list 
of classes to use as interceptors.";
+
+public static final String INTERCEPTOR_CLASSES_CONFIG = 
"interceptor.classes";
+public static final String CLIENT_ID_CONFIG = "client.id";
+public static final String BOOTSTRAP_SERVERS_CONFIG = 
"bootstrap.servers";
+
+static {
+CONFIG = new ConfigDef().define(INTERCEPTOR_CLASSES_CONFIG,
+Type.LIST,
+Collections.emptyList(),
+Importance.LOW,
+INTERCEPTOR_CLASSES_CONFIG_DOC);
+}
+public InterceptorTestConfig(Map props) {
+super(CONFIG, props);
+}
+
+public int getTargetInterceptor() {
+return targetInterceptor;
+}
+}

Review Comment:
   @C0urante No apologies necessary.  I've reverted the code back to your 
preference.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

2023-02-02 Thread via GitHub


beardt commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1093403718


##
clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java:
##
@@ -54,6 +58,12 @@ public void testConfiguredInstances() {
 
testInvalidInputs("org.apache.kafka.clients.producer.unknown-metrics-reporter");
 testInvalidInputs("test1,test2");
 
testInvalidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter,");
+testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", "
++ 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", "
++ 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR,  
org.apache.kafka.test.MockConsumerInterceptor.class);
+testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR + ", "
++ 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR + ", "
++ 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR,  
org.apache.kafka.test.MockProducerInterceptor.class);

Review Comment:
   As a follow up on your earlier comments, another value of this test is in 
understanding the intent of the coded feature.  In this case (pun intended), a 
developer looking at this test case would rightfully wonder what does 
`TestConfig.METRIC_REPORT_CLASSES_CONFIG` have to do with interceptors?  
   
   Furthermore, using `TestConfig` in your suggested fashion and goal adds 
confusion to the test case as `METRIC_REPORT_CLASSES_CONFIG` would never be 
used with interceptors in the real world.  
   
   **NOTE:**  Although not in scope,  `TestConfig` really should be renamed to 
`MetricReporterTestConfig` as this more accurately aligns with the 
implementation.  
   
   Therefore, I recommend retaining the slightly renamed 
`InterceptorTestConfig` class as this is consistent with the collaborating test 
subject e.g. `MockConsumerInterceptor` and  reduces noise when reasoning out 
the test case.   Additionally, I've modified your suggested test case below 
which includes a slightly updated version of the `InterceptorTestConfig` class 
your review:   
   
```
   @Test
   public void testConfiguredInstancesClosedOnFailure() {
  try {
   Map props = new HashMap<>();
   String threeConsumerInterceptors = 
MockConsumerInterceptor.class.getName() + ", "
   + MockConsumerInterceptor.class.getName() + ", "
   + MockConsumerInterceptor.class.getName();
   props.put(InterceptorTestConfig.INTERCEPTOR_CLASSES_CONFIG, 
threeConsumerInterceptors);
   props.put(InterceptorTestConfig.CLIENT_ID_CONFIG, "test");
   
   InterceptorTestConfig interceptorTestConfig = new 
InterceptorTestConfig(props);
   
MockConsumerInterceptor.setThrowOnConfigExceptionThreshold(interceptorTestConfig.getTargetInterceptor());
   assertThrows(
   Exception.class,
   () -> 
interceptorTestConfig.getConfiguredInstances(InterceptorTestConfig.INTERCEPTOR_CLASSES_CONFIG,
 Object.class)
   );
   assertEquals(3, MockConsumerInterceptor.CONFIG_COUNT.get());
   assertEquals(3, MockConsumerInterceptor.CLOSE_COUNT.get());
   } finally {
   MockConsumerInterceptor.resetCounters();
   }
   }
   ```
   
   ```
   private static class InterceptorTestConfig extends AbstractConfig {
   private final int targetInterceptor = 3;
   private static final ConfigDef CONFIG;
   private static final String INTERCEPTOR_CLASSES_CONFIG_DOC = "A list 
of classes to use as interceptors.";
   
   public static final String INTERCEPTOR_CLASSES_CONFIG = 
"interceptor.classes";
   public static final String CLIENT_ID_CONFIG = "client.id";
   public static final String BOOTSTRAP_SERVERS_CONFIG = 
"bootstrap.servers";
   
   static {
   CONFIG = new ConfigDef().define(INTERCEPTOR_CLASSES_CONFIG,
   Type.LIST,
   "",
   Importance.LOW,
   INTERCEPTOR_CLASSES_CONFIG_DOC);
   }
   public InterceptorTestConfig(Map props) {
   super(CONFIG, props);
   }
   
   public int getTargetInterceptor() {
   return targetInterceptor;
   }
   }
   ```
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact 

[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

2023-02-01 Thread via GitHub


beardt commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1093403718


##
clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java:
##
@@ -54,6 +58,12 @@ public void testConfiguredInstances() {
 
testInvalidInputs("org.apache.kafka.clients.producer.unknown-metrics-reporter");
 testInvalidInputs("test1,test2");
 
testInvalidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter,");
+testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", "
++ 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", "
++ 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR,  
org.apache.kafka.test.MockConsumerInterceptor.class);
+testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR + ", "
++ 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR + ", "
++ 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR,  
org.apache.kafka.test.MockProducerInterceptor.class);

Review Comment:
   As a follow up on your earlier comments, another value of this test is in 
understanding the intent of the coded feature.  In this case (pun intended), a 
developer looking at this test case would rightfully wonder what does 
`TestConfig.METRIC_REPORT_CLASSES_CONFIG` have to do with interceptors?  
   
   Furthermore, using `TestConfig` in your suggested fashion and goal adds 
confusion to the test case as `METRIC_REPORT_CLASSES_CONFIG` would never be 
used with interceptors in the real world.  
   
   **NOTE:**  Although not in scope,  `TestConfig` really should be renamed to 
`MetricReporterTestConfig` as this more accurately aligns with the 
implementation.  
   
   Therefore, I recommend retaining the slightly renamed 
`InterceptorTestConfig` class as this is consistent with the collaborating test 
subject e.g. `MockConsumerInterceptor` and  reduces noise when reasoning out 
the test case.   Additionally, I've modified your suggested test case below 
which includes a slightly updated version of the `InterceptorTestConfig` class 
your review:   
   
```
   @Test
   public void testConfiguredInstancesClosedOnFailure() {
   try {
   Map props = new HashMap<>();
   String threeConsumerInterceptors = 
MockConsumerInterceptor.class.getName() + ", "
   + MockConsumerInterceptor.class.getName() + ", "
   + MockConsumerInterceptor.class.getName();
   props.put(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, 
threeConsumerInterceptors);
   props.put(TestInterceptorConfig.CLIENT_ID_CONFIG, "test");
   TestConfig testConfig = new TestConfig(props);
   
   
MockConsumerInterceptor.setThrowOnConfigExceptionThreshold(TestInterceptorConfig.getTargetInterceptor());
   assertThrows(
   Exception.class,
   () -> 
testConfig.getConfiguredInstances(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG,
 Object.class)
   );
   assertEquals(3, MockConsumerInterceptor.CONFIG_COUNT.get());
   assertEquals(2, MockConsumerInterceptor.CLOSE_COUNT.get());
   } finally {
   MockConsumerInterceptor.resetCounters();
   }
   }
   ```
   
   ```
   private static class InterceptorTestConfig extends AbstractConfig {
   private final int targetInterceptor = 3;
   private static final ConfigDef CONFIG;
   private static final String INTERCEPTOR_CLASSES_CONFIG_DOC = "A list 
of classes to use as interceptors.";
   
   public static final String INTERCEPTOR_CLASSES_CONFIG = 
"interceptor.classes";
   public static final String CLIENT_ID_CONFIG = "client.id";
   public static final String BOOTSTRAP_SERVERS_CONFIG = 
"bootstrap.servers";
   
   static {
   CONFIG = new ConfigDef().define(INTERCEPTOR_CLASSES_CONFIG,
   Type.LIST,
   "",
   Importance.LOW,
   INTERCEPTOR_CLASSES_CONFIG_DOC);
   }
   public InterceptorTestConfig(Map props) {
   super(CONFIG, props);
   }
   
   public int getTargetInterceptor() {
   return targetInterceptor;
   }
   }
   ```
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

2023-02-01 Thread via GitHub


beardt commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1093403718


##
clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java:
##
@@ -54,6 +58,12 @@ public void testConfiguredInstances() {
 
testInvalidInputs("org.apache.kafka.clients.producer.unknown-metrics-reporter");
 testInvalidInputs("test1,test2");
 
testInvalidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter,");
+testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", "
++ 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", "
++ 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR,  
org.apache.kafka.test.MockConsumerInterceptor.class);
+testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR + ", "
++ 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR + ", "
++ 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR,  
org.apache.kafka.test.MockProducerInterceptor.class);

Review Comment:
   As a follow up on your earlier comments, another value of this test is in 
understanding the intent of the coded feature.  In this case (pun intended), a 
developer looking at this test case would rightfully wonder what does 
`TestConfig.METRIC_REPORT_CLASSES_CONFIG` have to do with interceptors?  
   
   Furthermore, using `TestConfig` in your suggested fashion and goal adds 
confusion to the test case as `METRIC_REPORT_CLASSES_CONFIG` would never be 
used with interceptors in the real world.  
   
   **NOTE:**  Although not in scope,  `TestConfig` really should be renamed to 
`MetricReporterTestConfig` as this more accurately aligns with the 
implementation.  
   
   Therefore, I recommend retaining the slightly renamed 
`InterceptorTestConfig` class as this is consistent with the collaborating test 
subject e.g. `MockConsumerInterceptor` and  reduces noise when reasoning out 
the test case.   Additionally, I've modified your suggested test case below 
which includes an updated version of the `InterceptorTestConfig` class your 
review:   
   
```
   @Test
   public void testConfiguredInstancesClosedOnFailure() {
   try {
   Map props = new HashMap<>();
   String threeConsumerInterceptors = 
MockConsumerInterceptor.class.getName() + ", "
   + MockConsumerInterceptor.class.getName() + ", "
   + MockConsumerInterceptor.class.getName();
   props.put(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, 
threeConsumerInterceptors);
   props.put(TestInterceptorConfig.CLIENT_ID_CONFIG, "test");
   TestConfig testConfig = new TestConfig(props);
   
   
MockConsumerInterceptor.setThrowOnConfigExceptionThreshold(TestInterceptorConfig.getTargetInterceptor());
   assertThrows(
   Exception.class,
   () -> 
testConfig.getConfiguredInstances(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG,
 Object.class)
   );
   assertEquals(3, MockConsumerInterceptor.CONFIG_COUNT.get());
   assertEquals(2, MockConsumerInterceptor.CLOSE_COUNT.get());
   } finally {
   MockConsumerInterceptor.resetCounters();
   }
   }
   ```
   
   ```
   private static class InterceptorTestConfig extends AbstractConfig {
   private final int targetInterceptor = 3;
   private static final ConfigDef CONFIG;
   private static final String INTERCEPTOR_CLASSES_CONFIG_DOC = "A list 
of classes to use as interceptors.";
   
   public static final String INTERCEPTOR_CLASSES_CONFIG = 
"interceptor.classes";
   public static final String CLIENT_ID_CONFIG = "client.id";
   public static final String BOOTSTRAP_SERVERS_CONFIG = 
"bootstrap.servers";
   
   static {
   CONFIG = new ConfigDef().define(INTERCEPTOR_CLASSES_CONFIG,
   Type.LIST,
   "",
   Importance.LOW,
   INTERCEPTOR_CLASSES_CONFIG_DOC);
   }
   public InterceptorTestConfig(Map props) {
   super(CONFIG, props);
   }
   
   public int getTargetInterceptor() {
   return targetInterceptor;
   }
   }
   ```
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

2023-02-01 Thread via GitHub


beardt commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1093403718


##
clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java:
##
@@ -54,6 +58,12 @@ public void testConfiguredInstances() {
 
testInvalidInputs("org.apache.kafka.clients.producer.unknown-metrics-reporter");
 testInvalidInputs("test1,test2");
 
testInvalidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter,");
+testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", "
++ 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", "
++ 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR,  
org.apache.kafka.test.MockConsumerInterceptor.class);
+testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR + ", "
++ 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR + ", "
++ 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR,  
org.apache.kafka.test.MockProducerInterceptor.class);

Review Comment:
   As a follow up on your earlier comments, another value of this test is in 
understanding the intent of the coded feature.  In this case (pun intended), a 
developer looking at this test case would rightfully wonder what does 
`TestConfig.METRIC_REPORT_CLASSES_CONFIG` have to do with interceptors?  
   
   Furthermore, using `TestConfig` in your suggested fashion and goal adds 
confusion to the test case as `METRIC_REPORT_CLASSES_CONFIG` would never be 
used with interceptors in the real world.  
   
   **NOTE:**  Although not in scope,  `TestConfig` really should be renamed to 
`MetricReporterTestConfig` as this more accurately aligns with the 
implementation.  
   
   Therefore, I recommend retaining the slightly renamed 
`InterceptorTestConfig` class as this is consistent with the collaborating test 
subject e.g. `MockConsumerInterceptor` and  reduces noise when reasoning out 
the test case.   Additionally, I've modified your suggested test case below 
with an updated version of the `InterceptorTestConfig` class your review:   
   
```
   @Test
   public void testConfiguredInstancesClosedOnFailure() {
   try {
   Map props = new HashMap<>();
   String threeConsumerInterceptors = 
MockConsumerInterceptor.class.getName() + ", "
   + MockConsumerInterceptor.class.getName() + ", "
   + MockConsumerInterceptor.class.getName();
   props.put(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, 
threeConsumerInterceptors);
   props.put(TestInterceptorConfig.CLIENT_ID_CONFIG, "test");
   TestConfig testConfig = new TestConfig(props);
   
   
MockConsumerInterceptor.setThrowOnConfigExceptionThreshold(TestInterceptorConfig.getTargetInterceptor());
   assertThrows(
   Exception.class,
   () -> 
testConfig.getConfiguredInstances(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG,
 Object.class)
   );
   assertEquals(3, MockConsumerInterceptor.CONFIG_COUNT.get());
   assertEquals(2, MockConsumerInterceptor.CLOSE_COUNT.get());
   } finally {
   MockConsumerInterceptor.resetCounters();
   }
   }
   ```
   
   ```
   private static class InterceptorTestConfig extends AbstractConfig {
   private final int targetInterceptor = 3;
   private static final ConfigDef CONFIG;
   private static final String INTERCEPTOR_CLASSES_CONFIG_DOC = "A list 
of classes to use as interceptors.";
   
   public static final String INTERCEPTOR_CLASSES_CONFIG = 
"interceptor.classes";
   public static final String CLIENT_ID_CONFIG = "client.id";
   public static final String BOOTSTRAP_SERVERS_CONFIG = 
"bootstrap.servers";
   
   static {
   CONFIG = new ConfigDef().define(INTERCEPTOR_CLASSES_CONFIG,
   Type.LIST,
   "",
   Importance.LOW,
   INTERCEPTOR_CLASSES_CONFIG_DOC);
   }
   public InterceptorTestConfig(Map props) {
   super(CONFIG, props);
   }
   
   public int getTargetInterceptor() {
   return targetInterceptor;
   }
   }
   ```
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

2023-01-31 Thread via GitHub


beardt commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1092652543


##
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##
@@ -414,34 +414,51 @@ private  T getConfiguredInstance(Object klass, 
Class t, Map T getConfiguredInstance(String key, Class t) {
-return getConfiguredInstance(key, t, Collections.emptyMap());
+T configuredInstance = null;
+
+try {
+configuredInstance = getConfiguredInstance(key, t, 
Collections.emptyMap());
+} catch (Exception e) {
+maybeClose(configuredInstance, "AutoCloseable object constructed 
and configured during failed call to getConfiguredInstance");
+throw e;
+}
+
+return configuredInstance;
 }
 
 /**
  * Get a configured instance of the give class specified by the given 
configuration key. If the object implements
  * Configurable configure it using the configuration.
  *
- * @param key The configuration key for the class
- * @param t The interface the class should implement
+ * @param key The configuration key for the class
+ * @param t   The interface the class should implement
  * @param configOverrides override origin configs
  * @return A configured instance of the class
  */
 public  T getConfiguredInstance(String key, Class t, Map configOverrides) {
 Class c = getClass(key);
+T configuredInstance = null;
 
-return getConfiguredInstance(c, t, originals(configOverrides));
+try {
+configuredInstance = getConfiguredInstance(c, t, 
originals(configOverrides));
+} catch (Exception e) {

Review Comment:
   I went ahead and extended the try/catch to enclose the  `o = 
Utils.newInstance((String) klass, t);` as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

2023-01-31 Thread via GitHub


beardt commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1092652543


##
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##
@@ -414,34 +414,51 @@ private  T getConfiguredInstance(Object klass, 
Class t, Map T getConfiguredInstance(String key, Class t) {
-return getConfiguredInstance(key, t, Collections.emptyMap());
+T configuredInstance = null;
+
+try {
+configuredInstance = getConfiguredInstance(key, t, 
Collections.emptyMap());
+} catch (Exception e) {
+maybeClose(configuredInstance, "AutoCloseable object constructed 
and configured during failed call to getConfiguredInstance");
+throw e;
+}
+
+return configuredInstance;
 }
 
 /**
  * Get a configured instance of the give class specified by the given 
configuration key. If the object implements
  * Configurable configure it using the configuration.
  *
- * @param key The configuration key for the class
- * @param t The interface the class should implement
+ * @param key The configuration key for the class
+ * @param t   The interface the class should implement
  * @param configOverrides override origin configs
  * @return A configured instance of the class
  */
 public  T getConfiguredInstance(String key, Class t, Map configOverrides) {
 Class c = getClass(key);
+T configuredInstance = null;
 
-return getConfiguredInstance(c, t, originals(configOverrides));
+try {
+configuredInstance = getConfiguredInstance(c, t, 
originals(configOverrides));
+} catch (Exception e) {

Review Comment:
   I've updated accordingly.  However, this assumes no resource leakage within 
the constructor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

2023-01-31 Thread via GitHub


beardt commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1092516181


##
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##
@@ -168,6 +169,7 @@ public class KafkaConsumerTest {
 private final int defaultApiTimeoutMs = 6;
 private final int requestTimeoutMs = defaultApiTimeoutMs / 2;
 private final int heartbeatIntervalMs = 1000;
+private final int targetInterceptor = 3;

Review Comment:
   Just an observation, I see the following variable is used in only one test 
case  `private final int throttleMs = 10;` as well.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

2023-01-31 Thread via GitHub


beardt commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1092513735


##
clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java:
##
@@ -54,6 +58,12 @@ public void testConfiguredInstances() {
 
testInvalidInputs("org.apache.kafka.clients.producer.unknown-metrics-reporter");
 testInvalidInputs("test1,test2");
 
testInvalidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter,");
+testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", "
++ 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", "
++ 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR,  
org.apache.kafka.test.MockConsumerInterceptor.class);
+testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR + ", "
++ 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR + ", "
++ 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR,  
org.apache.kafka.test.MockProducerInterceptor.class);

Review Comment:
   @C0urante  Done!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

2023-01-31 Thread via GitHub


beardt commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1092017609


##
clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java:
##
@@ -55,6 +58,11 @@ public void configure(Map configs) {
 Object clientIdValue = configs.get(ConsumerConfig.CLIENT_ID_CONFIG);
 if (clientIdValue == null)
 throw new ConfigException("Mock consumer interceptor expects 
configuration " + ProducerConfig.CLIENT_ID_CONFIG);
+
+CONFIG_COUNT.incrementAndGet();
+if (CONFIG_COUNT.get() == THROW_CONFIG_EXCEPTION_THRESHOLD.get()) {
+throw new ConfigException("Kafka producer creation failed. Failure 
may not have cleaned up listener thread resource.");

Review Comment:
   Done!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

2023-01-30 Thread via GitHub


beardt commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1091360138


##
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##
@@ -476,14 +480,34 @@ public  List getConfiguredInstances(List 
classNames, Class t, M
 return objects;
 Map configPairs = originals();
 configPairs.putAll(configOverrides);
-for (Object klass : classNames) {
-Object o = getConfiguredInstance(klass, t, configPairs);
-objects.add(t.cast(o));
+
+try {
+for (Object klass : classNames) {
+Object o = getConfiguredInstance(klass, t, configPairs);
+objects.add(t.cast(o));
+}
+} catch (Exception e) {
+for (Object object : objects) {
+if (object instanceof AutoCloseable) {
+try {
+((AutoCloseable) object).close();
+} catch (Exception ex) {
+log.error(String.format("Close exception on %s", 
object.getClass().getName()), ex);
+}
+} else if (object instanceof Closeable) {
+try {
+((Closeable) object).close();
+} catch (Exception ex) {
+log.error(String.format("Close exception on %s", 
object.getClass().getName()), ex);
+}

Review Comment:
   @C0urante  I'm glad to hear that you are feeling better.  
   Yes, given the KIP avoidance constraint, I really did not want to change 
getConfiguredInstances, but  it seemed the only way to address this was to 
change it very carefully.   However,  I believe I've implemented each of your 
comments and thanks for the excellent feedback. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

2023-01-30 Thread via GitHub


beardt commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1091360138


##
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##
@@ -476,14 +480,34 @@ public  List getConfiguredInstances(List 
classNames, Class t, M
 return objects;
 Map configPairs = originals();
 configPairs.putAll(configOverrides);
-for (Object klass : classNames) {
-Object o = getConfiguredInstance(klass, t, configPairs);
-objects.add(t.cast(o));
+
+try {
+for (Object klass : classNames) {
+Object o = getConfiguredInstance(klass, t, configPairs);
+objects.add(t.cast(o));
+}
+} catch (Exception e) {
+for (Object object : objects) {
+if (object instanceof AutoCloseable) {
+try {
+((AutoCloseable) object).close();
+} catch (Exception ex) {
+log.error(String.format("Close exception on %s", 
object.getClass().getName()), ex);
+}
+} else if (object instanceof Closeable) {
+try {
+((Closeable) object).close();
+} catch (Exception ex) {
+log.error(String.format("Close exception on %s", 
object.getClass().getName()), ex);
+}

Review Comment:
   @C0urante  I'm glad to hear that you are feeling better.  
   Yes, given the KIP avoidance constraint, I really did not want to change 
getConfiguredInstances, but  it seemed the only way to address this was to 
change it very carefully.   However,  I believe I've implemented each of your 
comments and athanks for the excellent feedback. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

2023-01-30 Thread via GitHub


beardt commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1091360138


##
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##
@@ -476,14 +480,34 @@ public  List getConfiguredInstances(List 
classNames, Class t, M
 return objects;
 Map configPairs = originals();
 configPairs.putAll(configOverrides);
-for (Object klass : classNames) {
-Object o = getConfiguredInstance(klass, t, configPairs);
-objects.add(t.cast(o));
+
+try {
+for (Object klass : classNames) {
+Object o = getConfiguredInstance(klass, t, configPairs);
+objects.add(t.cast(o));
+}
+} catch (Exception e) {
+for (Object object : objects) {
+if (object instanceof AutoCloseable) {
+try {
+((AutoCloseable) object).close();
+} catch (Exception ex) {
+log.error(String.format("Close exception on %s", 
object.getClass().getName()), ex);
+}
+} else if (object instanceof Closeable) {
+try {
+((Closeable) object).close();
+} catch (Exception ex) {
+log.error(String.format("Close exception on %s", 
object.getClass().getName()), ex);
+}

Review Comment:
   @C0urante  I'm glad to hear that you are feeling better.  
   Yes, given the KIP avoidance constraint, I really wanted to avoid changing 
getConfiguredInstances, but  it seemed the only way to address this was to 
change it.   However,  I believe I've implemented each of your comments and 
athanks for the excellent feedback. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

2023-01-30 Thread via GitHub


beardt commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1091360138


##
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##
@@ -476,14 +480,34 @@ public  List getConfiguredInstances(List 
classNames, Class t, M
 return objects;
 Map configPairs = originals();
 configPairs.putAll(configOverrides);
-for (Object klass : classNames) {
-Object o = getConfiguredInstance(klass, t, configPairs);
-objects.add(t.cast(o));
+
+try {
+for (Object klass : classNames) {
+Object o = getConfiguredInstance(klass, t, configPairs);
+objects.add(t.cast(o));
+}
+} catch (Exception e) {
+for (Object object : objects) {
+if (object instanceof AutoCloseable) {
+try {
+((AutoCloseable) object).close();
+} catch (Exception ex) {
+log.error(String.format("Close exception on %s", 
object.getClass().getName()), ex);
+}
+} else if (object instanceof Closeable) {
+try {
+((Closeable) object).close();
+} catch (Exception ex) {
+log.error(String.format("Close exception on %s", 
object.getClass().getName()), ex);
+}

Review Comment:
   @C0urante  I'm glad to hear that you are feeling better.  
   Yes, given the KIP avoidance constraint, it seemed the only way to address 
this was to change the getConfiguredInstances.   But,  I believe I've 
implemented each of your comments and thanks for the excellent feedback. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org