[ https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17680003#comment-17680003 ]
Terry Beard commented on KAFKA-14565: ------------------------------------- [~ChrisEgerton] I'm a noob when it comes to participating in open source projects and as such any advice on the process/goals are most welcomed! > Improve Interceptor Resource Leakage Prevention > ----------------------------------------------- > > Key: KAFKA-14565 > URL: https://issues.apache.org/jira/browse/KAFKA-14565 > Project: Kafka > Issue Type: Improvement > Components: clients > Reporter: Terry Beard > Assignee: Terry Beard > Priority: Major > Labels: needs-kip > Fix For: 3.5.0 > > > The Consumer and Producer interceptor interfaces and their corresponding > Kafka Consumer and Producer constructors do not adequately support cleanup of > underlying interceptor resources. > Currently within the Kafka Consumer and Kafka Producer constructors, the > *AbstractConfig.getConfiguredInstances()* is delegated responsibility for > both creating and configuring each interceptor listed in the > interceptor.classes property and returns a configured > *List<ConsumerInterceptor<K,V>>* interceptors. > This dual responsibility for both creation and configuration is problematic > when it involves multiple interceptors where at least one interceptor's > configure method implementation creates and/or depends on objects which > creates threads, connections or other resources which requires clean up and > the subsequent interceptor's configure method raises a runtime exception. > This raising of the runtime exception produces a resource leakage in the > first interceptor as the interceptor container i.e. > ConsumerInterceptors/ProducerInterceptors is never created and therefore the > first interceptor's and really any interceptor's close method are never > called. > To help ensure the respective container interceptors are able to invoke their > respective interceptor close methods for proper resource clean up, I propose > two approaches: > +*PROPOSAL 1*+ > Define a default *open* or *configureWithResources()* or *acquireResources()* > method with no implementation and check exception on the respective > Consumer/Producer interceptor interfaces. This method as a part the > interceptor life cycle management will be responsible for creating threads > and/or objects which utilizes threads, connections or other resource which > requires clean up. Additionally, this default method enables implementation > optionality as it's empty default behavior means it will do nothing when > unimplemented mitigating backwards compatibility impact to exiting > interceptors. Finally, the Kafka Consumer/Producer Interceptor containers > will implement a corresponding *maybeOpen* or *maybeConfigureWithResources* > or *maybeAcquireResources* method which also throws a checked exception. > See below code excerpt for the Consumer/Producer constructor: > {code:java} > List<ConsumerInterceptor<K, V>> interceptorList = (List) > config.getConfiguredInstances( > ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, > ConsumerInterceptor.class, > Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)); > this.interceptors = new ConsumerInterceptors<>(interceptorList); > this.interceptors.maybeConfigureWithResources(); > {code} > +*PROPOSAL 2*+ > To avoid changing a public interface and the subsequent KIP process we can > add a new method on the *AbstractConfig* class called > *getConfiguredInstanceResult()* which returns a > {*}ConfiguredInstanceResult<{*}{*}ConsumerInterceptor<K, V>>{*} object. A > call to *ConfiguredInstanceResult.getConfiguredInstances()* returns an > optional list of interceptors. > If an exception occurs during configuration of one or more interceptors, the > *AbstractConfig.getConfiguredInstanceResult()* will abort, capture the > exception while maintaining an optional list of previously created > interceptors. A call to *ConfiguredInstanceResult.getException()* returns an > optional exception at which time the caller can determine how to handle e.g. > perform clean up. However, if the caller wants to rethrow the exception > where existing catch logic can handle, it can call > *ConfiguredInstanceResult.maybeThrowWhenAnyConfigurationFails().* This > method will do nothing when there is no exception. > See below code excerpt example for the Consumer/Producer constructor: > > {code:java} > ConfiguredInstanceResult<ConsumerInterceptor<K, V>> configuredInstanceResult > = (ConfiguredInstanceResult) config.getConfiguredInstanceResult( > ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, > ConsumerInterceptor.class, > Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, > clientId)); > List<ConsumerInterceptor<K, V>> interceptorList = > configuredInstanceResult.getInstances().orElse(null); > configuredInstanceResult.throwWhenAnyConfigurationFails();{code} > In terms of impact to the developers, one could argue coding may be required > in both approaches to get the full benefit of either. However, I'm open to > either approach or another approach outside of these. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)