[
https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chris Egerton resolved KAFKA-14565.
-----------------------------------
Resolution: Fixed
> Interceptor Resource Leak
> -------------------------
>
> Key: KAFKA-14565
> URL: https://issues.apache.org/jira/browse/KAFKA-14565
> Project: Kafka
> Issue Type: Improvement
> Components: clients
> Reporter: Terry Beard
> Assignee: Terry Beard
> Priority: Major
> Fix For: 3.5.0
>
>
> The Consumer and Producer interceptor interfaces and their corresponding
> Kafka Consumer and Producer constructors do not adequately support cleanup of
> underlying interceptor resources.
> Currently within the Kafka Consumer and Kafka Producer constructors, the
> *AbstractConfig.getConfiguredInstances()* is delegated responsibility for
> both creating and configuring each interceptor listed in the
> interceptor.classes property and returns a configured
> *List<ConsumerInterceptor<K,V>>* interceptors.
> This dual responsibility for both creation and configuration is problematic
> when it involves multiple interceptors where at least one interceptor's
> configure method implementation creates and/or depends on objects which
> creates threads, connections or other resources which requires clean up and
> the subsequent interceptor's configure method raises a runtime exception.
> This raising of the runtime exception produces a resource leakage in the
> first interceptor as the interceptor container i.e.
> ConsumerInterceptors/ProducerInterceptors is never created and therefore the
> first interceptor's and really any interceptor's close method are never
> called.
> To help ensure the respective container interceptors are able to invoke their
> respective interceptor close methods for proper resource clean up, I propose
> two approaches:
> +*PROPOSAL 1*+
> Define a default *open* or *configureWithResources()* or *acquireResources()*
> method with no implementation and check exception on the respective
> Consumer/Producer interceptor interfaces. This method as a part the
> interceptor life cycle management will be responsible for creating threads
> and/or objects which utilizes threads, connections or other resource which
> requires clean up. Additionally, this default method enables implementation
> optionality as it's empty default behavior means it will do nothing when
> unimplemented mitigating backwards compatibility impact to exiting
> interceptors. Finally, the Kafka Consumer/Producer Interceptor containers
> will implement a corresponding *maybeOpen* or *maybeConfigureWithResources*
> or *maybeAcquireResources* method which also throws a checked exception.
> See below code excerpt for the Consumer/Producer constructor:
> {code:java}
> List<ConsumerInterceptor<K, V>> interceptorList = (List)
> config.getConfiguredInstances(
> ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
> ConsumerInterceptor.class,
> Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
> this.interceptors = new ConsumerInterceptors<>(interceptorList);
> this.interceptors.maybeConfigureWithResources();
> {code}
> +*PROPOSAL 2*+
> To avoid changing any public interfaces and the subsequent KIP process, we can
> * Create a class which inherits or wraps AbstractConfig that contains a new
> method which will return a ConfiguredInstanceResult class. This
> ConfiguredInstanceResult class will contain an optional list of successfully
> created interceptors and/or exception which occurred while calling each
> Interceptor::configure. Additionally, it will contain a helper method to
> rethrow an exception as well as a method which returns the underlying
> exception. The caller is expected to handle the exception and perform clean
> up e.g. call Interceptor::close on each interceptor in the list provided by
> the ConfiguredInstanceResult class.
> * Automatically invoke {{close}} on any {{Closeable}} or {{AutoCloseable}}
> instances if/when a failure occurs
> * Add a new overloaded {{getConfiguredInstance}} /
> {{getConfiguredInstances}} variant that allows users to specify whether
> already-instantiated classes should be closed or not when a failure occurs
> * Add a new exception type to the public API that includes a list of all of
> the successfully-instantiated (and/or successfully-configured) instances
> before the error was encountered so that callers can choose how to handle the
> failure however they want (and possibly so that instantiation/configuration
> can be attempted on every class before throwing the exception)
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)