[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak
beardt commented on code in PR #13168: URL: https://github.com/apache/kafka/pull/13168#discussion_r1107962390 ## clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java: ## @@ -389,22 +389,27 @@ public void logUnused() { private T getConfiguredInstance(Object klass, Class t, Map configPairs) { if (klass == null) return null; +Object o = null; +try { +if (klass instanceof String) { +try { +o = Utils.newInstance((String) klass, t); +} catch (ClassNotFoundException e) { +throw new KafkaException("Class " + klass + " cannot be found", e); +} +} else if (klass instanceof Class) { +o = Utils.newInstance((Class) klass); +} else +throw new KafkaException("Unexpected element of type " + klass.getClass().getName() + ", expected String or Class"); +if (!t.isInstance(o)) +throw new KafkaException(klass + " is not an instance of " + t.getName()); +if (o instanceof Configurable) +((Configurable) o).configure(configPairs); +} catch (Exception e) { +maybeClose(o, "AutoCloseable object constructed and configured during failed call to getConfiguredInstance"); +throw e; +} Review Comment: @C0urante: I removed all extra blank lines `getConfiguredInstance` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak
beardt commented on code in PR #13168: URL: https://github.com/apache/kafka/pull/13168#discussion_r1107961671 ## clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java: ## @@ -599,6 +624,31 @@ public TestConfig(Map props) { } } +private static class InterceptorTestConfig extends AbstractConfig { +private final int targetInterceptor = 3; +private static final ConfigDef CONFIG; +private static final String INTERCEPTOR_CLASSES_CONFIG_DOC = "A list of classes to use as interceptors."; + +public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes"; +public static final String CLIENT_ID_CONFIG = "client.id"; +public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers"; + +static { +CONFIG = new ConfigDef().define(INTERCEPTOR_CLASSES_CONFIG, +Type.LIST, +Collections.emptyList(), +Importance.LOW, +INTERCEPTOR_CLASSES_CONFIG_DOC); +} +public InterceptorTestConfig(Map props) { +super(CONFIG, props); +} + +public int getTargetInterceptor() { +return targetInterceptor; +} +} Review Comment: @C0urante No apologies necessary. I've reverted the code back to your preference. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak
beardt commented on code in PR #13168: URL: https://github.com/apache/kafka/pull/13168#discussion_r1093403718 ## clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java: ## @@ -54,6 +58,12 @@ public void testConfiguredInstances() { testInvalidInputs("org.apache.kafka.clients.producer.unknown-metrics-reporter"); testInvalidInputs("test1,test2"); testInvalidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter,"); +testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", " ++ TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", " ++ TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR, org.apache.kafka.test.MockConsumerInterceptor.class); +testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR + ", " ++ TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR + ", " ++ TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR, org.apache.kafka.test.MockProducerInterceptor.class); Review Comment: As a follow up on your earlier comments, another value of this test is in understanding the intent of the coded feature. In this case (pun intended), a developer looking at this test case would rightfully wonder what does `TestConfig.METRIC_REPORT_CLASSES_CONFIG` have to do with interceptors? Furthermore, using `TestConfig` in your suggested fashion and goal adds confusion to the test case as `METRIC_REPORT_CLASSES_CONFIG` would never be used with interceptors in the real world. **NOTE:** Although not in scope, `TestConfig` really should be renamed to `MetricReporterTestConfig` as this more accurately aligns with the implementation. Therefore, I recommend retaining the slightly renamed `InterceptorTestConfig` class as this is consistent with the collaborating test subject e.g. `MockConsumerInterceptor` and reduces noise when reasoning out the test case. Additionally, I've modified your suggested test case below which includes a slightly updated version of the `InterceptorTestConfig` class your review: ``` @Test public void testConfiguredInstancesClosedOnFailure() { try { Map props = new HashMap<>(); String threeConsumerInterceptors = MockConsumerInterceptor.class.getName() + ", " + MockConsumerInterceptor.class.getName() + ", " + MockConsumerInterceptor.class.getName(); props.put(InterceptorTestConfig.INTERCEPTOR_CLASSES_CONFIG, threeConsumerInterceptors); props.put(InterceptorTestConfig.CLIENT_ID_CONFIG, "test"); InterceptorTestConfig interceptorTestConfig = new InterceptorTestConfig(props); MockConsumerInterceptor.setThrowOnConfigExceptionThreshold(interceptorTestConfig.getTargetInterceptor()); assertThrows( Exception.class, () -> interceptorTestConfig.getConfiguredInstances(InterceptorTestConfig.INTERCEPTOR_CLASSES_CONFIG, Object.class) ); assertEquals(3, MockConsumerInterceptor.CONFIG_COUNT.get()); assertEquals(3, MockConsumerInterceptor.CLOSE_COUNT.get()); } finally { MockConsumerInterceptor.resetCounters(); } } ``` ``` private static class InterceptorTestConfig extends AbstractConfig { private final int targetInterceptor = 3; private static final ConfigDef CONFIG; private static final String INTERCEPTOR_CLASSES_CONFIG_DOC = "A list of classes to use as interceptors."; public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes"; public static final String CLIENT_ID_CONFIG = "client.id"; public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers"; static { CONFIG = new ConfigDef().define(INTERCEPTOR_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, INTERCEPTOR_CLASSES_CONFIG_DOC); } public InterceptorTestConfig(Map props) { super(CONFIG, props); } public int getTargetInterceptor() { return targetInterceptor; } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact
[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak
beardt commented on code in PR #13168: URL: https://github.com/apache/kafka/pull/13168#discussion_r1093403718 ## clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java: ## @@ -54,6 +58,12 @@ public void testConfiguredInstances() { testInvalidInputs("org.apache.kafka.clients.producer.unknown-metrics-reporter"); testInvalidInputs("test1,test2"); testInvalidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter,"); +testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", " ++ TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", " ++ TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR, org.apache.kafka.test.MockConsumerInterceptor.class); +testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR + ", " ++ TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR + ", " ++ TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR, org.apache.kafka.test.MockProducerInterceptor.class); Review Comment: As a follow up on your earlier comments, another value of this test is in understanding the intent of the coded feature. In this case (pun intended), a developer looking at this test case would rightfully wonder what does `TestConfig.METRIC_REPORT_CLASSES_CONFIG` have to do with interceptors? Furthermore, using `TestConfig` in your suggested fashion and goal adds confusion to the test case as `METRIC_REPORT_CLASSES_CONFIG` would never be used with interceptors in the real world. **NOTE:** Although not in scope, `TestConfig` really should be renamed to `MetricReporterTestConfig` as this more accurately aligns with the implementation. Therefore, I recommend retaining the slightly renamed `InterceptorTestConfig` class as this is consistent with the collaborating test subject e.g. `MockConsumerInterceptor` and reduces noise when reasoning out the test case. Additionally, I've modified your suggested test case below which includes a slightly updated version of the `InterceptorTestConfig` class your review: ``` @Test public void testConfiguredInstancesClosedOnFailure() { try { Map props = new HashMap<>(); String threeConsumerInterceptors = MockConsumerInterceptor.class.getName() + ", " + MockConsumerInterceptor.class.getName() + ", " + MockConsumerInterceptor.class.getName(); props.put(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, threeConsumerInterceptors); props.put(TestInterceptorConfig.CLIENT_ID_CONFIG, "test"); TestConfig testConfig = new TestConfig(props); MockConsumerInterceptor.setThrowOnConfigExceptionThreshold(TestInterceptorConfig.getTargetInterceptor()); assertThrows( Exception.class, () -> testConfig.getConfiguredInstances(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, Object.class) ); assertEquals(3, MockConsumerInterceptor.CONFIG_COUNT.get()); assertEquals(2, MockConsumerInterceptor.CLOSE_COUNT.get()); } finally { MockConsumerInterceptor.resetCounters(); } } ``` ``` private static class InterceptorTestConfig extends AbstractConfig { private final int targetInterceptor = 3; private static final ConfigDef CONFIG; private static final String INTERCEPTOR_CLASSES_CONFIG_DOC = "A list of classes to use as interceptors."; public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes"; public static final String CLIENT_ID_CONFIG = "client.id"; public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers"; static { CONFIG = new ConfigDef().define(INTERCEPTOR_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, INTERCEPTOR_CLASSES_CONFIG_DOC); } public InterceptorTestConfig(Map props) { super(CONFIG, props); } public int getTargetInterceptor() { return targetInterceptor; } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak
beardt commented on code in PR #13168: URL: https://github.com/apache/kafka/pull/13168#discussion_r1093403718 ## clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java: ## @@ -54,6 +58,12 @@ public void testConfiguredInstances() { testInvalidInputs("org.apache.kafka.clients.producer.unknown-metrics-reporter"); testInvalidInputs("test1,test2"); testInvalidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter,"); +testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", " ++ TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", " ++ TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR, org.apache.kafka.test.MockConsumerInterceptor.class); +testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR + ", " ++ TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR + ", " ++ TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR, org.apache.kafka.test.MockProducerInterceptor.class); Review Comment: As a follow up on your earlier comments, another value of this test is in understanding the intent of the coded feature. In this case (pun intended), a developer looking at this test case would rightfully wonder what does `TestConfig.METRIC_REPORT_CLASSES_CONFIG` have to do with interceptors? Furthermore, using `TestConfig` in your suggested fashion and goal adds confusion to the test case as `METRIC_REPORT_CLASSES_CONFIG` would never be used with interceptors in the real world. **NOTE:** Although not in scope, `TestConfig` really should be renamed to `MetricReporterTestConfig` as this more accurately aligns with the implementation. Therefore, I recommend retaining the slightly renamed `InterceptorTestConfig` class as this is consistent with the collaborating test subject e.g. `MockConsumerInterceptor` and reduces noise when reasoning out the test case. Additionally, I've modified your suggested test case below which includes an updated version of the `InterceptorTestConfig` class your review: ``` @Test public void testConfiguredInstancesClosedOnFailure() { try { Map props = new HashMap<>(); String threeConsumerInterceptors = MockConsumerInterceptor.class.getName() + ", " + MockConsumerInterceptor.class.getName() + ", " + MockConsumerInterceptor.class.getName(); props.put(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, threeConsumerInterceptors); props.put(TestInterceptorConfig.CLIENT_ID_CONFIG, "test"); TestConfig testConfig = new TestConfig(props); MockConsumerInterceptor.setThrowOnConfigExceptionThreshold(TestInterceptorConfig.getTargetInterceptor()); assertThrows( Exception.class, () -> testConfig.getConfiguredInstances(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, Object.class) ); assertEquals(3, MockConsumerInterceptor.CONFIG_COUNT.get()); assertEquals(2, MockConsumerInterceptor.CLOSE_COUNT.get()); } finally { MockConsumerInterceptor.resetCounters(); } } ``` ``` private static class InterceptorTestConfig extends AbstractConfig { private final int targetInterceptor = 3; private static final ConfigDef CONFIG; private static final String INTERCEPTOR_CLASSES_CONFIG_DOC = "A list of classes to use as interceptors."; public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes"; public static final String CLIENT_ID_CONFIG = "client.id"; public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers"; static { CONFIG = new ConfigDef().define(INTERCEPTOR_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, INTERCEPTOR_CLASSES_CONFIG_DOC); } public InterceptorTestConfig(Map props) { super(CONFIG, props); } public int getTargetInterceptor() { return targetInterceptor; } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak
beardt commented on code in PR #13168: URL: https://github.com/apache/kafka/pull/13168#discussion_r1093403718 ## clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java: ## @@ -54,6 +58,12 @@ public void testConfiguredInstances() { testInvalidInputs("org.apache.kafka.clients.producer.unknown-metrics-reporter"); testInvalidInputs("test1,test2"); testInvalidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter,"); +testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", " ++ TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", " ++ TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR, org.apache.kafka.test.MockConsumerInterceptor.class); +testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR + ", " ++ TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR + ", " ++ TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR, org.apache.kafka.test.MockProducerInterceptor.class); Review Comment: As a follow up on your earlier comments, another value of this test is in understanding the intent of the coded feature. In this case (pun intended), a developer looking at this test case would rightfully wonder what does `TestConfig.METRIC_REPORT_CLASSES_CONFIG` have to do with interceptors? Furthermore, using `TestConfig` in your suggested fashion and goal adds confusion to the test case as `METRIC_REPORT_CLASSES_CONFIG` would never be used with interceptors in the real world. **NOTE:** Although not in scope, `TestConfig` really should be renamed to `MetricReporterTestConfig` as this more accurately aligns with the implementation. Therefore, I recommend retaining the slightly renamed `InterceptorTestConfig` class as this is consistent with the collaborating test subject e.g. `MockConsumerInterceptor` and reduces noise when reasoning out the test case. Additionally, I've modified your suggested test case below with an updated version of the `InterceptorTestConfig` class your review: ``` @Test public void testConfiguredInstancesClosedOnFailure() { try { Map props = new HashMap<>(); String threeConsumerInterceptors = MockConsumerInterceptor.class.getName() + ", " + MockConsumerInterceptor.class.getName() + ", " + MockConsumerInterceptor.class.getName(); props.put(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, threeConsumerInterceptors); props.put(TestInterceptorConfig.CLIENT_ID_CONFIG, "test"); TestConfig testConfig = new TestConfig(props); MockConsumerInterceptor.setThrowOnConfigExceptionThreshold(TestInterceptorConfig.getTargetInterceptor()); assertThrows( Exception.class, () -> testConfig.getConfiguredInstances(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, Object.class) ); assertEquals(3, MockConsumerInterceptor.CONFIG_COUNT.get()); assertEquals(2, MockConsumerInterceptor.CLOSE_COUNT.get()); } finally { MockConsumerInterceptor.resetCounters(); } } ``` ``` private static class InterceptorTestConfig extends AbstractConfig { private final int targetInterceptor = 3; private static final ConfigDef CONFIG; private static final String INTERCEPTOR_CLASSES_CONFIG_DOC = "A list of classes to use as interceptors."; public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes"; public static final String CLIENT_ID_CONFIG = "client.id"; public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers"; static { CONFIG = new ConfigDef().define(INTERCEPTOR_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, INTERCEPTOR_CLASSES_CONFIG_DOC); } public InterceptorTestConfig(Map props) { super(CONFIG, props); } public int getTargetInterceptor() { return targetInterceptor; } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak
beardt commented on code in PR #13168: URL: https://github.com/apache/kafka/pull/13168#discussion_r1092652543 ## clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java: ## @@ -414,34 +414,51 @@ private T getConfiguredInstance(Object klass, Class t, Map T getConfiguredInstance(String key, Class t) { -return getConfiguredInstance(key, t, Collections.emptyMap()); +T configuredInstance = null; + +try { +configuredInstance = getConfiguredInstance(key, t, Collections.emptyMap()); +} catch (Exception e) { +maybeClose(configuredInstance, "AutoCloseable object constructed and configured during failed call to getConfiguredInstance"); +throw e; +} + +return configuredInstance; } /** * Get a configured instance of the give class specified by the given configuration key. If the object implements * Configurable configure it using the configuration. * - * @param key The configuration key for the class - * @param t The interface the class should implement + * @param key The configuration key for the class + * @param t The interface the class should implement * @param configOverrides override origin configs * @return A configured instance of the class */ public T getConfiguredInstance(String key, Class t, Map configOverrides) { Class c = getClass(key); +T configuredInstance = null; -return getConfiguredInstance(c, t, originals(configOverrides)); +try { +configuredInstance = getConfiguredInstance(c, t, originals(configOverrides)); +} catch (Exception e) { Review Comment: I went ahead and extended the try/catch to enclose the `o = Utils.newInstance((String) klass, t);` as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak
beardt commented on code in PR #13168: URL: https://github.com/apache/kafka/pull/13168#discussion_r1092652543 ## clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java: ## @@ -414,34 +414,51 @@ private T getConfiguredInstance(Object klass, Class t, Map T getConfiguredInstance(String key, Class t) { -return getConfiguredInstance(key, t, Collections.emptyMap()); +T configuredInstance = null; + +try { +configuredInstance = getConfiguredInstance(key, t, Collections.emptyMap()); +} catch (Exception e) { +maybeClose(configuredInstance, "AutoCloseable object constructed and configured during failed call to getConfiguredInstance"); +throw e; +} + +return configuredInstance; } /** * Get a configured instance of the give class specified by the given configuration key. If the object implements * Configurable configure it using the configuration. * - * @param key The configuration key for the class - * @param t The interface the class should implement + * @param key The configuration key for the class + * @param t The interface the class should implement * @param configOverrides override origin configs * @return A configured instance of the class */ public T getConfiguredInstance(String key, Class t, Map configOverrides) { Class c = getClass(key); +T configuredInstance = null; -return getConfiguredInstance(c, t, originals(configOverrides)); +try { +configuredInstance = getConfiguredInstance(c, t, originals(configOverrides)); +} catch (Exception e) { Review Comment: I've updated accordingly. However, this assumes no resource leakage within the constructor. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak
beardt commented on code in PR #13168: URL: https://github.com/apache/kafka/pull/13168#discussion_r1092516181 ## clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java: ## @@ -168,6 +169,7 @@ public class KafkaConsumerTest { private final int defaultApiTimeoutMs = 6; private final int requestTimeoutMs = defaultApiTimeoutMs / 2; private final int heartbeatIntervalMs = 1000; +private final int targetInterceptor = 3; Review Comment: Just an observation, I see the following variable is used in only one test case `private final int throttleMs = 10;` as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak
beardt commented on code in PR #13168: URL: https://github.com/apache/kafka/pull/13168#discussion_r1092513735 ## clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java: ## @@ -54,6 +58,12 @@ public void testConfiguredInstances() { testInvalidInputs("org.apache.kafka.clients.producer.unknown-metrics-reporter"); testInvalidInputs("test1,test2"); testInvalidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter,"); +testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", " ++ TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", " ++ TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR, org.apache.kafka.test.MockConsumerInterceptor.class); +testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR + ", " ++ TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR + ", " ++ TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR, org.apache.kafka.test.MockProducerInterceptor.class); Review Comment: @C0urante Done! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak
beardt commented on code in PR #13168: URL: https://github.com/apache/kafka/pull/13168#discussion_r1092017609 ## clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java: ## @@ -55,6 +58,11 @@ public void configure(Map configs) { Object clientIdValue = configs.get(ConsumerConfig.CLIENT_ID_CONFIG); if (clientIdValue == null) throw new ConfigException("Mock consumer interceptor expects configuration " + ProducerConfig.CLIENT_ID_CONFIG); + +CONFIG_COUNT.incrementAndGet(); +if (CONFIG_COUNT.get() == THROW_CONFIG_EXCEPTION_THRESHOLD.get()) { +throw new ConfigException("Kafka producer creation failed. Failure may not have cleaned up listener thread resource."); Review Comment: Done! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak
beardt commented on code in PR #13168: URL: https://github.com/apache/kafka/pull/13168#discussion_r1091360138 ## clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java: ## @@ -476,14 +480,34 @@ public List getConfiguredInstances(List classNames, Class t, M return objects; Map configPairs = originals(); configPairs.putAll(configOverrides); -for (Object klass : classNames) { -Object o = getConfiguredInstance(klass, t, configPairs); -objects.add(t.cast(o)); + +try { +for (Object klass : classNames) { +Object o = getConfiguredInstance(klass, t, configPairs); +objects.add(t.cast(o)); +} +} catch (Exception e) { +for (Object object : objects) { +if (object instanceof AutoCloseable) { +try { +((AutoCloseable) object).close(); +} catch (Exception ex) { +log.error(String.format("Close exception on %s", object.getClass().getName()), ex); +} +} else if (object instanceof Closeable) { +try { +((Closeable) object).close(); +} catch (Exception ex) { +log.error(String.format("Close exception on %s", object.getClass().getName()), ex); +} Review Comment: @C0urante I'm glad to hear that you are feeling better. Yes, given the KIP avoidance constraint, I really did not want to change getConfiguredInstances, but it seemed the only way to address this was to change it very carefully. However, I believe I've implemented each of your comments and thanks for the excellent feedback. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak
beardt commented on code in PR #13168: URL: https://github.com/apache/kafka/pull/13168#discussion_r1091360138 ## clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java: ## @@ -476,14 +480,34 @@ public List getConfiguredInstances(List classNames, Class t, M return objects; Map configPairs = originals(); configPairs.putAll(configOverrides); -for (Object klass : classNames) { -Object o = getConfiguredInstance(klass, t, configPairs); -objects.add(t.cast(o)); + +try { +for (Object klass : classNames) { +Object o = getConfiguredInstance(klass, t, configPairs); +objects.add(t.cast(o)); +} +} catch (Exception e) { +for (Object object : objects) { +if (object instanceof AutoCloseable) { +try { +((AutoCloseable) object).close(); +} catch (Exception ex) { +log.error(String.format("Close exception on %s", object.getClass().getName()), ex); +} +} else if (object instanceof Closeable) { +try { +((Closeable) object).close(); +} catch (Exception ex) { +log.error(String.format("Close exception on %s", object.getClass().getName()), ex); +} Review Comment: @C0urante I'm glad to hear that you are feeling better. Yes, given the KIP avoidance constraint, I really did not want to change getConfiguredInstances, but it seemed the only way to address this was to change it very carefully. However, I believe I've implemented each of your comments and athanks for the excellent feedback. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak
beardt commented on code in PR #13168: URL: https://github.com/apache/kafka/pull/13168#discussion_r1091360138 ## clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java: ## @@ -476,14 +480,34 @@ public List getConfiguredInstances(List classNames, Class t, M return objects; Map configPairs = originals(); configPairs.putAll(configOverrides); -for (Object klass : classNames) { -Object o = getConfiguredInstance(klass, t, configPairs); -objects.add(t.cast(o)); + +try { +for (Object klass : classNames) { +Object o = getConfiguredInstance(klass, t, configPairs); +objects.add(t.cast(o)); +} +} catch (Exception e) { +for (Object object : objects) { +if (object instanceof AutoCloseable) { +try { +((AutoCloseable) object).close(); +} catch (Exception ex) { +log.error(String.format("Close exception on %s", object.getClass().getName()), ex); +} +} else if (object instanceof Closeable) { +try { +((Closeable) object).close(); +} catch (Exception ex) { +log.error(String.format("Close exception on %s", object.getClass().getName()), ex); +} Review Comment: @C0urante I'm glad to hear that you are feeling better. Yes, given the KIP avoidance constraint, I really wanted to avoid changing getConfiguredInstances, but it seemed the only way to address this was to change it. However, I believe I've implemented each of your comments and athanks for the excellent feedback. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak
beardt commented on code in PR #13168: URL: https://github.com/apache/kafka/pull/13168#discussion_r1091360138 ## clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java: ## @@ -476,14 +480,34 @@ public List getConfiguredInstances(List classNames, Class t, M return objects; Map configPairs = originals(); configPairs.putAll(configOverrides); -for (Object klass : classNames) { -Object o = getConfiguredInstance(klass, t, configPairs); -objects.add(t.cast(o)); + +try { +for (Object klass : classNames) { +Object o = getConfiguredInstance(klass, t, configPairs); +objects.add(t.cast(o)); +} +} catch (Exception e) { +for (Object object : objects) { +if (object instanceof AutoCloseable) { +try { +((AutoCloseable) object).close(); +} catch (Exception ex) { +log.error(String.format("Close exception on %s", object.getClass().getName()), ex); +} +} else if (object instanceof Closeable) { +try { +((Closeable) object).close(); +} catch (Exception ex) { +log.error(String.format("Close exception on %s", object.getClass().getName()), ex); +} Review Comment: @C0urante I'm glad to hear that you are feeling better. Yes, given the KIP avoidance constraint, it seemed the only way to address this was to change the getConfiguredInstances. But, I believe I've implemented each of your comments and thanks for the excellent feedback. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org