[ https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Terry Beard updated KAFKA-14565: -------------------------------- Description: The Consumer and Producer interceptor interfaces and their corresponding Kafka Consumer and Producer constructors do not adequately support cleanup of underlying interceptor resources. Currently within the Kafka Consumer and Kafka Producer constructors, the *AbstractConfig.getConfiguredInstances()* is delegated responsibility for both creating and configuring each interceptor listed in the interceptor.classes property and returns a configured *List<ConsumerInterceptor<K,V>>* interceptors. This dual responsibility for both creation and configuration is problematic when it involves multiple interceptors where at least one interceptor's configure method implementation creates and/or depends on objects which creates threads, connections or other resources which requires clean up and the subsequent interceptor's configure method raises a runtime exception. This raising of the runtime exception produces a resource leakage in the first interceptor as the interceptor container i.e. ConsumerInterceptors/ProducerInterceptors is never created and therefore the first interceptor's and really any interceptor's close method are never called. To help ensure the respective container interceptors are able to invoke their respective interceptor close methods for proper resource clean up, I propose two approaches: +*PROPOSAL 1*+ Define a default *open* or *configureWithResources()* or *acquireResources()* method with no implementation and check exception on the respective Consumer/Producer interceptor interfaces. This method as a part the interceptor life cycle management will be responsible for creating threads and/or objects which utilizes threads, connections or other resource which requires clean up. Additionally, this default method enables implementation optionality as it's empty default behavior means it will do nothing when unimplemented mitigating backwards compatibility impact to exiting interceptors. Finally, the Kafka Consumer/Producer Interceptor containers will implement a corresponding *maybeOpen* or *maybeConfigureWithResources* or *maybeAcquireResources* method which also throws a checked exception. See below code excerpt for the Consumer/Producer constructor: {code:java} List<ConsumerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances( ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptor.class, Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)); this.interceptors = new ConsumerInterceptors<>(interceptorList); this.interceptors.maybeConfigureWithResources(); {code} +*PROPOSAL 2*+ To avoid changing a public interface and the subsequent KIP process we can add a new method on the *AbstractConfig* class called *getConfiguredInstanceResult()* which returns a *ConfiguredInstanceResult<* *ConsumerInterceptor<K, V>>* object. A call to *ConfiguredInstanceResult.getConfiguredInstances()* returns an optional list of interceptors. If an exception occurs during configuration of one or more interceptors, the *AbstractConfig.getConfiguredInstanceResult()* will abort, capture the exception while maintaining an optional list of any previously created interceptors. A call to *ConfiguredInstanceResult.getException()* returns an optional exception at which time the caller can determine how to handle e.g. perform clean up. However, if the caller wants to rethrow the exception where existing catch logic can handle, it can call *ConfiguredInstanceResult.maybeThrowWhenAnyConfigurationFailed().* This method will do nothing when there is no exception. See below code excerpt example for the Consumer/Producer constructor: {code:java} ConfiguredInstanceResult<ConsumerInterceptor<K, V>> configuredInstanceResult = config.getConfiguredInstanceResult( ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptor.class, Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)); List<ConsumerInterceptor<K, V>> interceptorList = configuredInstanceResult.getInstances().orElse(null); configuredInstanceResult.maybeThrowWhenAnyConfigurationFailed();{code} In terms of impact to the developers, one could argue coding may be required in both approaches to get the full benefit of either. However, I'm open to either approach or another approach outside of these. was: The Consumer and Producer interceptor interfaces and their corresponding Kafka Consumer and Producer constructors do not adequately support cleanup of underlying interceptor resources. Currently within the Kafka Consumer and Kafka Producer constructors, the *AbstractConfig.getConfiguredInstances()* is delegated responsibility for both creating and configuring each interceptor listed in the interceptor.classes property and returns a configured *List<ConsumerInterceptor<K,V>>* interceptors. This dual responsibility for both creation and configuration is problematic when it involves multiple interceptors where at least one interceptor's configure method implementation creates and/or depends on objects which creates threads, connections or other resources which requires clean up and the subsequent interceptor's configure method raises a runtime exception. This raising of the runtime exception produces a resource leakage in the first interceptor as the interceptor container i.e. ConsumerInterceptors/ProducerInterceptors is never created and therefore the first interceptor's and really any interceptor's close method are never called. To help ensure the respective container interceptors are able to invoke their respective interceptor close methods for proper resource clean up, I propose two approaches: +*PROPOSAL 1*+ Define a default *open* or *configureWithResources()* or *acquireResources()* method with no implementation and check exception on the respective Consumer/Producer interceptor interfaces. This method as a part the interceptor life cycle management will be responsible for creating threads and/or objects which utilizes threads, connections or other resource which requires clean up. Additionally, this default method enables implementation optionality as it's empty default behavior means it will do nothing when unimplemented mitigating backwards compatibility impact to exiting interceptors. Finally, the Kafka Consumer/Producer Interceptor containers will implement a corresponding *maybeOpen* or *maybeConfigureWithResources* or *maybeAcquireResources* method which also throws a checked exception. See below code excerpt for the Consumer/Producer constructor: {code:java} List<ConsumerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances( ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptor.class, Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)); this.interceptors = new ConsumerInterceptors<>(interceptorList); this.interceptors.maybeConfigureWithResources(); {code} +*PROPOSAL 2*+ To avoid changing a public interface and the subsequent KIP process we can add a new method on the *AbstractConfig* class called *getConfiguredInstanceResult()* which returns a *ConfiguredInstanceResult<* *ConsumerInterceptor<K, V>>* object. A call to *ConfiguredInstanceResult.getConfiguredInstances()* returns an optional list of interceptors. If an exception occurs during configuration of one or more interceptors, the *AbstractConfig.getConfiguredInstanceResult()* will abort, capture the exception and maintain an optional list of any previously created interceptors. A call to *ConfiguredInstanceResult.getException()* returns an optional exception at which time the caller can determine how to handle e.g. perform clean up. However, if the caller wants to rethrow the exception where existing catch logic can handle, it can call *ConfiguredInstanceResult.maybeThrowWhenAnyConfigurationFailed().* This method will do nothing when there is no exception. See below code excerpt example for the Consumer/Producer constructor: {code:java} ConfiguredInstanceResult<ConsumerInterceptor<K, V>> configuredInstanceResult = config.getConfiguredInstanceResult( ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptor.class, Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)); List<ConsumerInterceptor<K, V>> interceptorList = configuredInstanceResult.getInstances().orElse(null); configuredInstanceResult.maybeThrowWhenAnyConfigurationFailed();{code} In terms of impact to the developers, one could argue coding may be required in both approaches to get the full benefit of either. However, I'm open to either approach or another approach outside of these. > Improve Interceptor Resource Leakage Prevention > ----------------------------------------------- > > Key: KAFKA-14565 > URL: https://issues.apache.org/jira/browse/KAFKA-14565 > Project: Kafka > Issue Type: Improvement > Components: clients > Reporter: Terry Beard > Assignee: Terry Beard > Priority: Major > Labels: needs-kip > Fix For: 3.5.0 > > > The Consumer and Producer interceptor interfaces and their corresponding > Kafka Consumer and Producer constructors do not adequately support cleanup of > underlying interceptor resources. > Currently within the Kafka Consumer and Kafka Producer constructors, the > *AbstractConfig.getConfiguredInstances()* is delegated responsibility for > both creating and configuring each interceptor listed in the > interceptor.classes property and returns a configured > *List<ConsumerInterceptor<K,V>>* interceptors. > This dual responsibility for both creation and configuration is problematic > when it involves multiple interceptors where at least one interceptor's > configure method implementation creates and/or depends on objects which > creates threads, connections or other resources which requires clean up and > the subsequent interceptor's configure method raises a runtime exception. > This raising of the runtime exception produces a resource leakage in the > first interceptor as the interceptor container i.e. > ConsumerInterceptors/ProducerInterceptors is never created and therefore the > first interceptor's and really any interceptor's close method are never > called. > To help ensure the respective container interceptors are able to invoke their > respective interceptor close methods for proper resource clean up, I propose > two approaches: > +*PROPOSAL 1*+ > Define a default *open* or *configureWithResources()* or *acquireResources()* > method with no implementation and check exception on the respective > Consumer/Producer interceptor interfaces. This method as a part the > interceptor life cycle management will be responsible for creating threads > and/or objects which utilizes threads, connections or other resource which > requires clean up. Additionally, this default method enables implementation > optionality as it's empty default behavior means it will do nothing when > unimplemented mitigating backwards compatibility impact to exiting > interceptors. Finally, the Kafka Consumer/Producer Interceptor containers > will implement a corresponding *maybeOpen* or *maybeConfigureWithResources* > or *maybeAcquireResources* method which also throws a checked exception. > See below code excerpt for the Consumer/Producer constructor: > {code:java} > List<ConsumerInterceptor<K, V>> interceptorList = (List) > config.getConfiguredInstances( > ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, > ConsumerInterceptor.class, > Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)); > this.interceptors = new ConsumerInterceptors<>(interceptorList); > this.interceptors.maybeConfigureWithResources(); > {code} > +*PROPOSAL 2*+ > To avoid changing a public interface and the subsequent KIP process we can > add a new method on the *AbstractConfig* class called > *getConfiguredInstanceResult()* which returns a *ConfiguredInstanceResult<* > *ConsumerInterceptor<K, V>>* object. A call to > *ConfiguredInstanceResult.getConfiguredInstances()* returns an optional list > of interceptors. > If an exception occurs during configuration of one or more interceptors, the > *AbstractConfig.getConfiguredInstanceResult()* will abort, capture the > exception while maintaining an optional list of any previously created > interceptors. A call to *ConfiguredInstanceResult.getException()* returns an > optional exception at which time the caller can determine how to handle e.g. > perform clean up. However, if the caller wants to rethrow the exception > where existing catch logic can handle, it can call > *ConfiguredInstanceResult.maybeThrowWhenAnyConfigurationFailed().* This > method will do nothing when there is no exception. > See below code excerpt example for the Consumer/Producer constructor: > {code:java} > ConfiguredInstanceResult<ConsumerInterceptor<K, V>> configuredInstanceResult > = config.getConfiguredInstanceResult( > ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, > ConsumerInterceptor.class, > Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)); > List<ConsumerInterceptor<K, V>> interceptorList = > configuredInstanceResult.getInstances().orElse(null); > configuredInstanceResult.maybeThrowWhenAnyConfigurationFailed();{code} > In terms of impact to the developers, one could argue coding may be required > in both approaches to get the full benefit of either. However, I'm open to > either approach or another approach outside of these. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)