[jira] [Updated] (KAFKA-14565) Improve Interceptor Resource Leakage Prevention

2023-01-23 Thread Terry Beard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Beard updated KAFKA-14565:

Description: 
The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
*AbstractConfig.getConfiguredInstances()*  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  *List>* 
interceptors.

This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a runtime exception.  This 
raising of the runtime exception produces a resource leakage in the first 
interceptor as the interceptor container i.e. 
ConsumerInterceptors/ProducerInterceptors is never created and therefore the 
first interceptor's and really any interceptor's close method are never called. 
 

To help ensure the respective container interceptors are able to invoke their 
respective interceptor close methods for proper resource clean up, I propose 
two approaches:

+*PROPOSAL 1*+
Define a default *open* or *configureWithResources()* or *acquireResources()*  
method with no implementation and check exception on the respective 
Consumer/Producer interceptor interfaces.  This method as a part the 
interceptor life cycle management will be responsible for creating threads 
and/or objects which utilizes threads, connections or other resource which 
requires clean up.  Additionally, this default method enables implementation 
optionality as it's empty default behavior means it will do nothing when 
unimplemented mitigating backwards compatibility impact to exiting 
interceptors.  Finally, the Kafka Consumer/Producer Interceptor containers will 
implement a corresponding *maybeOpen* or *maybeConfigureWithResources* or 
*maybeAcquireResources* method which also throws a checked exception. 

See below code excerpt for the Consumer/Producer constructor:
{code:java}
List> interceptorList = (List) 
config.getConfiguredInstances(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
ConsumerInterceptor.class,
Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));

this.interceptors = new ConsumerInterceptors<>(interceptorList);
this.interceptors.maybeConfigureWithResources();
 {code}
+*PROPOSAL 2*+

To avoid changing any public interfaces and the subsequent KIP process, we can
 * Create a class which inherits or wraps AbstractConfig that contains a new 
method which will return a ConfiguredInstanceResult class.  This 
ConfiguredInstanceResult  class will contain an optional list of successfully 
created interceptors and/or exception which occurred while calling each 
Interceptor::configure.  Additionally, it will contain a helper method to 
rethrow an exception as well as a method which returns the underlying 
exception.  The caller is expected to handle the exception and perform clean up 
e.g. call  Interceptor::close  on each interceptor in the list provided by the 
ConfiguredInstanceResult class.
 * Automatically invoke {{close}} on any {{Closeable}} or {{AutoCloseable}} 
instances if/when a failure occurs
 * Add a new overloaded {{getConfiguredInstance}} / {{getConfiguredInstances}} 
variant that allows users to specify whether already-instantiated classes 
should be closed or not when a failure occurs
 * Add a new exception type to the public API that includes a list of all of 
the successfully-instantiated (and/or successfully-configured) instances before 
the error was encountered so that callers can choose how to handle the failure 
however they want (and possibly so that instantiation/configuration can be 
attempted on every class before throwing the exception)

 

 

 

  was:
The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
*AbstractConfig.getConfiguredInstances()*  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  *List>* 
interceptors.

This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up 

[jira] [Updated] (KAFKA-14565) Improve Interceptor Resource Leakage Prevention

2023-01-23 Thread Terry Beard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Beard updated KAFKA-14565:

Description: 
The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
*AbstractConfig.getConfiguredInstances()*  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  *List>* 
interceptors.

This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a runtime exception.  This 
raising of the runtime exception produces a resource leakage in the first 
interceptor as the interceptor container i.e. 
ConsumerInterceptors/ProducerInterceptors is never created and therefore the 
first interceptor's and really any interceptor's close method are never called. 
 

To help ensure the respective container interceptors are able to invoke their 
respective interceptor close methods for proper resource clean up, I propose 
two approaches:

+*PROPOSAL 1*+
Define a default *open* or *configureWithResources()* or *acquireResources()*  
method with no implementation and check exception on the respective 
Consumer/Producer interceptor interfaces.  This method as a part the 
interceptor life cycle management will be responsible for creating threads 
and/or objects which utilizes threads, connections or other resource which 
requires clean up.  Additionally, this default method enables implementation 
optionality as it's empty default behavior means it will do nothing when 
unimplemented mitigating backwards compatibility impact to exiting 
interceptors.  Finally, the Kafka Consumer/Producer Interceptor containers will 
implement a corresponding *maybeOpen* or *maybeConfigureWithResources* or 
*maybeAcquireResources* method which also throws a checked exception. 

See below code excerpt for the Consumer/Producer constructor:
{code:java}
List> interceptorList = (List) 
config.getConfiguredInstances(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
ConsumerInterceptor.class,
Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));

this.interceptors = new ConsumerInterceptors<>(interceptorList);
this.interceptors.maybeConfigureWithResources();
 {code}
+*PROPOSAL 2*+

To avoid changing any public interfaces and the subsequent KIP process, we can 
create a class which inherits or wraps AbstractConfig that contains a new 
method which will return a ConfiguredInstanceResult class.  This 
ConfiguredInstanceResult  class will contain an optional list of successfully 
created interceptors and/or exception which occurred while calling each 
Interceptor::configure.  Additionally, it will contain a helper method to 
rethrow an exception as well as a method which returns the underlying 
exception.  The caller is expected to handle the exception and perform clean up 
e.g. call  Interceptor::close  on each interceptor in the list provided by the 
ConfiguredInstanceResult class.

 

 

 

  was:
The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
*AbstractConfig.getConfiguredInstances()*  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  *List>* 
interceptors.

This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a runtime exception.  This 
raising of the runtime exception produces a resource leakage in the first 
interceptor as the interceptor container i.e. 
ConsumerInterceptors/ProducerInterceptors is never created and therefore the 
first interceptor's and really any interceptor's close method are never called. 
 

To help ensure the respective container interceptors are able to invoke their 
respective interceptor close methods for proper resource clean up, I propose 
two approaches:

+*PROPOSAL 1*+
Define a default *open* or *configureWithResources()* or *acquireResources()*  
method with no implementation and check excepti

[jira] [Updated] (KAFKA-14565) Improve Interceptor Resource Leakage Prevention

2023-01-23 Thread Terry Beard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Beard updated KAFKA-14565:

Labels:   (was: needs-kip)

> Improve Interceptor Resource Leakage Prevention
> ---
>
> Key: KAFKA-14565
> URL: https://issues.apache.org/jira/browse/KAFKA-14565
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Terry Beard
>Assignee: Terry Beard
>Priority: Major
> Fix For: 3.5.0
>
>
> The Consumer and Producer interceptor interfaces and their corresponding 
> Kafka Consumer and Producer constructors do not adequately support cleanup of 
> underlying interceptor resources. 
> Currently within the Kafka Consumer and Kafka Producer constructors,  the 
> *AbstractConfig.getConfiguredInstances()*  is delegated responsibility for 
> both creating and configuring each interceptor listed in the 
> interceptor.classes property and returns a configured  
> *List>* interceptors.
> This dual responsibility for both creation and configuration is problematic 
> when it involves multiple interceptors where at least one interceptor's 
> configure method implementation creates and/or depends on objects which 
> creates threads, connections or other resources which requires clean up and 
> the subsequent interceptor's configure method raises a runtime exception.  
> This raising of the runtime exception produces a resource leakage in the 
> first interceptor as the interceptor container i.e. 
> ConsumerInterceptors/ProducerInterceptors is never created and therefore the 
> first interceptor's and really any interceptor's close method are never 
> called.  
> To help ensure the respective container interceptors are able to invoke their 
> respective interceptor close methods for proper resource clean up, I propose 
> two approaches:
> +*PROPOSAL 1*+
> Define a default *open* or *configureWithResources()* or *acquireResources()* 
>  method with no implementation and check exception on the respective 
> Consumer/Producer interceptor interfaces.  This method as a part the 
> interceptor life cycle management will be responsible for creating threads 
> and/or objects which utilizes threads, connections or other resource which 
> requires clean up.  Additionally, this default method enables implementation 
> optionality as it's empty default behavior means it will do nothing when 
> unimplemented mitigating backwards compatibility impact to exiting 
> interceptors.  Finally, the Kafka Consumer/Producer Interceptor containers 
> will implement a corresponding *maybeOpen* or *maybeConfigureWithResources* 
> or *maybeAcquireResources* method which also throws a checked exception. 
> See below code excerpt for the Consumer/Producer constructor:
> {code:java}
> List> interceptorList = (List) 
> config.getConfiguredInstances(
> ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
> ConsumerInterceptor.class,
> Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
> this.interceptors = new ConsumerInterceptors<>(interceptorList);
> this.interceptors.maybeConfigureWithResources();
>  {code}
> +*PROPOSAL 2*+
> To avoid changing any public interfaces and the subsequent KIP process, we 
> can create a class which inherits or wraps  {{AbstractConfig }}that contains 
> a new method which will return a {{ConfiguredInstanceResult}} class.  This 
> {{ConfiguredInstanceResult}}  class will contain an optional list of 
> successfully created interceptors and/or exception which occurred while 
> calling each {{{}Interceptor::configure{}}}.  Additionally, it will contain a 
> helper method to rethrow an exception as well as a method which returns the 
> underlying exception.  The caller is expected to handle the exception and 
> perform clean up e.g. call  {{Interceptor::close}}  on each interceptor in 
> the list provided by the {{ConfiguredInstanceResult }}class.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14565) Improve Interceptor Resource Leakage Prevention

2023-01-23 Thread Terry Beard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Beard updated KAFKA-14565:

Description: 
The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
*AbstractConfig.getConfiguredInstances()*  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  *List>* 
interceptors.

This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a runtime exception.  This 
raising of the runtime exception produces a resource leakage in the first 
interceptor as the interceptor container i.e. 
ConsumerInterceptors/ProducerInterceptors is never created and therefore the 
first interceptor's and really any interceptor's close method are never called. 
 

To help ensure the respective container interceptors are able to invoke their 
respective interceptor close methods for proper resource clean up, I propose 
two approaches:

+*PROPOSAL 1*+
Define a default *open* or *configureWithResources()* or *acquireResources()*  
method with no implementation and check exception on the respective 
Consumer/Producer interceptor interfaces.  This method as a part the 
interceptor life cycle management will be responsible for creating threads 
and/or objects which utilizes threads, connections or other resource which 
requires clean up.  Additionally, this default method enables implementation 
optionality as it's empty default behavior means it will do nothing when 
unimplemented mitigating backwards compatibility impact to exiting 
interceptors.  Finally, the Kafka Consumer/Producer Interceptor containers will 
implement a corresponding *maybeOpen* or *maybeConfigureWithResources* or 
*maybeAcquireResources* method which also throws a checked exception. 

See below code excerpt for the Consumer/Producer constructor:
{code:java}
List> interceptorList = (List) 
config.getConfiguredInstances(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
ConsumerInterceptor.class,
Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));

this.interceptors = new ConsumerInterceptors<>(interceptorList);
this.interceptors.maybeConfigureWithResources();
 {code}
+*PROPOSAL 2*+

To avoid changing any public interfaces and the subsequent KIP process, we can 
create a class which inherits or wraps  {{AbstractConfig that contains a new 
method which will return a ConfiguredInstanceResult}} class.  This 
{{ConfiguredInstanceResult}}  class will contain an optional list of 
successfully created interceptors and/or exception which occurred while calling 
each {{{}Interceptor::configure{}}}.  Additionally, it will contain a helper 
method to rethrow an exception as well as a method which returns the underlying 
exception.  The caller is expected to handle the exception and perform clean up 
e.g. call  {{Interceptor::close}}  on each interceptor in the list provided by 
the ConfiguredInstanceResult class.

 

 

 

  was:
The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
*AbstractConfig.getConfiguredInstances()*  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  *List>* 
interceptors.

This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a runtime exception.  This 
raising of the runtime exception produces a resource leakage in the first 
interceptor as the interceptor container i.e. 
ConsumerInterceptors/ProducerInterceptors is never created and therefore the 
first interceptor's and really any interceptor's close method are never called. 
 

To help ensure the respective container interceptors are able to invoke their 
respective interceptor close methods for proper resource clean up, I propose 
two approaches:

+*PROPOSAL 1*+
Define a default *open* or *configureWithResources()* or *acquireResources()*  
method with no implementat

[jira] [Updated] (KAFKA-14565) Improve Interceptor Resource Leakage Prevention

2023-01-23 Thread Terry Beard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Beard updated KAFKA-14565:

Description: 
The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
*AbstractConfig.getConfiguredInstances()*  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  *List>* 
interceptors.

This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a runtime exception.  This 
raising of the runtime exception produces a resource leakage in the first 
interceptor as the interceptor container i.e. 
ConsumerInterceptors/ProducerInterceptors is never created and therefore the 
first interceptor's and really any interceptor's close method are never called. 
 

To help ensure the respective container interceptors are able to invoke their 
respective interceptor close methods for proper resource clean up, I propose 
two approaches:

+*PROPOSAL 1*+
Define a default *open* or *configureWithResources()* or *acquireResources()*  
method with no implementation and check exception on the respective 
Consumer/Producer interceptor interfaces.  This method as a part the 
interceptor life cycle management will be responsible for creating threads 
and/or objects which utilizes threads, connections or other resource which 
requires clean up.  Additionally, this default method enables implementation 
optionality as it's empty default behavior means it will do nothing when 
unimplemented mitigating backwards compatibility impact to exiting 
interceptors.  Finally, the Kafka Consumer/Producer Interceptor containers will 
implement a corresponding *maybeOpen* or *maybeConfigureWithResources* or 
*maybeAcquireResources* method which also throws a checked exception. 

See below code excerpt for the Consumer/Producer constructor:
{code:java}
List> interceptorList = (List) 
config.getConfiguredInstances(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
ConsumerInterceptor.class,
Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));

this.interceptors = new ConsumerInterceptors<>(interceptorList);
this.interceptors.maybeConfigureWithResources();
 {code}
+*PROPOSAL 2*+

To avoid changing any public interfaces and the subsequent KIP process, we can 
create a class which inherits or wraps  {{AbstractConfig }}that contains a new 
method which will return a {{ConfiguredInstanceResult}} class.  This 
{{ConfiguredInstanceResult}}  class will contain an optional list of 
successfully created interceptors and/or exception which occurred while calling 
each {{{}Interceptor::configure{}}}.  Additionally, it will contain a helper 
method to rethrow an exception as well as a method which returns the underlying 
exception.  The caller is expected to handle the exception and perform clean up 
e.g. call  {{Interceptor::close}}  on each interceptor in the list provided by 
the {{ConfiguredInstanceResult }}class.

 

 

 

  was:
The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
*AbstractConfig.getConfiguredInstances()*  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  *List>* 
interceptors.

This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a runtime exception.  This 
raising of the runtime exception produces a resource leakage in the first 
interceptor as the interceptor container i.e. 
ConsumerInterceptors/ProducerInterceptors is never created and therefore the 
first interceptor's and really any interceptor's close method are never called. 
 

To help ensure the respective container interceptors are able to invoke their 
respective interceptor close methods for proper resource clean up, I propose 
two approaches:

+*PROPOSAL 1*+
Define a default *open* or *configureWithResources()* or *acquireResources()*  
method with no imp

[jira] [Updated] (KAFKA-14565) Improve Interceptor Resource Leakage Prevention

2023-01-20 Thread Terry Beard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Beard updated KAFKA-14565:

Description: 
The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
*AbstractConfig.getConfiguredInstances()*  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  *List>* 
interceptors.

This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a runtime exception.  This 
raising of the runtime exception produces a resource leakage in the first 
interceptor as the interceptor container i.e. 
ConsumerInterceptors/ProducerInterceptors is never created and therefore the 
first interceptor's and really any interceptor's close method are never called. 
 

To help ensure the respective container interceptors are able to invoke their 
respective interceptor close methods for proper resource clean up, I propose 
two approaches:

+*PROPOSAL 1*+
Define a default *open* or *configureWithResources()* or *acquireResources()*  
method with no implementation and check exception on the respective 
Consumer/Producer interceptor interfaces.  This method as a part the 
interceptor life cycle management will be responsible for creating threads 
and/or objects which utilizes threads, connections or other resource which 
requires clean up.  Additionally, this default method enables implementation 
optionality as it's empty default behavior means it will do nothing when 
unimplemented mitigating backwards compatibility impact to exiting 
interceptors.  Finally, the Kafka Consumer/Producer Interceptor containers will 
implement a corresponding *maybeOpen* or *maybeConfigureWithResources* or 
*maybeAcquireResources* method which also throws a checked exception. 

See below code excerpt for the Consumer/Producer constructor:
{code:java}
List> interceptorList = (List) 
config.getConfiguredInstances(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
ConsumerInterceptor.class,
Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));

this.interceptors = new ConsumerInterceptors<>(interceptorList);
this.interceptors.maybeConfigureWithResources();
 {code}
+*PROPOSAL 2*+

To avoid changing a public interface and the subsequent KIP process we can add 
a new method on the *AbstractConfig* class called 
*getConfiguredInstanceResult()* which returns a 
{*}ConfiguredInstanceResult<{*}{*}ConsumerInterceptor>{*} object.  A call 
to *ConfiguredInstanceResult.getConfiguredInstances()* returns an optional list 
of interceptors. 

If an exception occurs during configuration of one or more interceptors, the 
*AbstractConfig.getConfiguredInstanceResult()* will abort, capture the 
exception while maintaining an optional list of previously created 
interceptors.  A call to *ConfiguredInstanceResult.getException()* returns an 
optional exception at which time the caller can determine how to handle e.g. 
perform clean up.  However, if the caller wants to rethrow the exception where 
existing catch logic can handle, it can call 
*ConfiguredInstanceResult.maybeThrowWhenAnyConfigurationFails().*  This method 
will do nothing when there is no exception. 

See below code excerpt example for the Consumer/Producer constructor: 

 
{code:java}
ConfiguredInstanceResult> configuredInstanceResult = 
(ConfiguredInstanceResult) config.getConfiguredInstanceResult(
                    ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
                    ConsumerInterceptor.class,
                    Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, 
clientId));
List> interceptorList = 
configuredInstanceResult.getInstances().orElse(null);
configuredInstanceResult.throwWhenAnyConfigurationFails();{code}
 In terms of impact to the developers, one could argue coding may be required 
in both approaches to get the full benefit of either.  However, I'm open to 
either approach or another approach outside of these.

 

 

  was:
The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
*AbstractConfig.getConfiguredInstances()*  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and re

[jira] [Updated] (KAFKA-14565) Improve Interceptor Resource Leakage Prevention

2023-01-19 Thread Terry Beard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Beard updated KAFKA-14565:

Description: 
The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
*AbstractConfig.getConfiguredInstances()*  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  *List>* 
interceptors.

This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a runtime exception.  This 
raising of the runtime exception produces a resource leakage in the first 
interceptor as the interceptor container i.e. 
ConsumerInterceptors/ProducerInterceptors is never created and therefore the 
first interceptor's and really any interceptor's close method are never called. 
 

To help ensure the respective container interceptors are able to invoke their 
respective interceptor close methods for proper resource clean up, I propose 
two approaches:

+*PROPOSAL 1*+
Define a default *open* or *configureWithResources()* or *acquireResources()*  
method with no implementation and check exception on the respective 
Consumer/Producer interceptor interfaces.  This method as a part the 
interceptor life cycle management will be responsible for creating threads 
and/or objects which utilizes threads, connections or other resource which 
requires clean up.  Additionally, this default method enables implementation 
optionality as it's empty default behavior means it will do nothing when 
unimplemented mitigating backwards compatibility impact to exiting 
interceptors.  Finally, the Kafka Consumer/Producer Interceptor containers will 
implement a corresponding *maybeOpen* or *maybeConfigureWithResources* or 
*maybeAcquireResources* method which also throws a checked exception. 

See below code excerpt for the Consumer/Producer constructor:
{code:java}
List> interceptorList = (List) 
config.getConfiguredInstances(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
ConsumerInterceptor.class,
Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));

this.interceptors = new ConsumerInterceptors<>(interceptorList);
this.interceptors.maybeConfigureWithResources();
 {code}
+*PROPOSAL 2*+

To avoid changing a public interface and the subsequent KIP process we can add 
a new method on the *AbstractConfig* class called 
*getConfiguredInstanceResult()* which returns a 
{*}ConfiguredInstanceResult<{*}{*}ConsumerInterceptor>{*} object.  A call 
to *ConfiguredInstanceResult.getConfiguredInstances()* returns an optional list 
of interceptors. 

If an exception occurs during configuration of one or more interceptors, the 
*AbstractConfig.getConfiguredInstanceResult()* will abort, capture the 
exception while maintaining an optional list of previously created 
interceptors.  A call to *ConfiguredInstanceResult.getException()* returns an 
optional exception at which time the caller can determine how to handle e.g. 
perform clean up.  However, if the caller wants to rethrow the exception where 
existing catch logic can handle, it can call 
*ConfiguredInstanceResult.maybeThrowWhenAnyConfigurationFails().*  This method 
will do nothing when there is no exception. 

See below code excerpt example for the Consumer/Producer constructor: 
{code:java}
ConfiguredInstanceResult> configuredInstanceResult = 
config.getConfiguredInstanceResult(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
ConsumerInterceptor.class,
Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));

List> interceptorList = 
configuredInstanceResult.getInstances().orElse(null);
configuredInstanceResult.maybeThrowWhenAnyConfigurationFails();{code}
 In terms of impact to the developers, one could argue coding may be required 
in both approaches to get the full benefit of either.  However, I'm open to 
either approach or another approach outside of these.

 

 

  was:
The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
*AbstractConfig.getConfiguredInstances()*  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  *List>* 
interceptors.

This dual respons

[jira] [Updated] (KAFKA-14565) Improve Interceptor Resource Leakage Prevention

2023-01-19 Thread Terry Beard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Beard updated KAFKA-14565:

Description: 
The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
*AbstractConfig.getConfiguredInstances()*  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  *List>* 
interceptors.

This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a runtime exception.  This 
raising of the runtime exception produces a resource leakage in the first 
interceptor as the interceptor container i.e. 
ConsumerInterceptors/ProducerInterceptors is never created and therefore the 
first interceptor's and really any interceptor's close method are never called. 
 

To help ensure the respective container interceptors are able to invoke their 
respective interceptor close methods for proper resource clean up, I propose 
two approaches:

+*PROPOSAL 1*+
Define a default *open* or *configureWithResources()* or *acquireResources()*  
method with no implementation and check exception on the respective 
Consumer/Producer interceptor interfaces.  This method as a part the 
interceptor life cycle management will be responsible for creating threads 
and/or objects which utilizes threads, connections or other resource which 
requires clean up.  Additionally, this default method enables implementation 
optionality as it's empty default behavior means it will do nothing when 
unimplemented mitigating backwards compatibility impact to exiting 
interceptors.  Finally, the Kafka Consumer/Producer Interceptor containers will 
implement a corresponding *maybeOpen* or *maybeConfigureWithResources* or 
*maybeAcquireResources* method which also throws a checked exception. 

See below code excerpt for the Consumer/Producer constructor:
{code:java}
List> interceptorList = (List) 
config.getConfiguredInstances(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
ConsumerInterceptor.class,
Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));

this.interceptors = new ConsumerInterceptors<>(interceptorList);
this.interceptors.maybeConfigureWithResources();
 {code}
+*PROPOSAL 2*+

To avoid changing a public interface and the subsequent KIP process we can add 
a new method on the *AbstractConfig* class called 
*getConfiguredInstanceResult()* which returns a 
{*}ConfiguredInstanceResult<{*}{*}ConsumerInterceptor>{*} object.  A call 
to *ConfiguredInstanceResult.getConfiguredInstances()* returns an optional list 
of interceptors. 

If an exception occurs during configuration of one or more interceptors, the 
*AbstractConfig.getConfiguredInstanceResult()* will abort, capture the 
exception while maintaining an optional list of previously created 
interceptors.  A call to *ConfiguredInstanceResult.getException()* returns an 
optional exception at which time the caller can determine how to handle e.g. 
perform clean up.  However, if the caller wants to rethrow the exception where 
existing catch logic can handle, it can call 
*ConfiguredInstanceResult.maybeThrowWhenAnyConfigurationFailed().*  This method 
will do nothing when there is no exception. 

See below code excerpt example for the Consumer/Producer constructor: 
{code:java}
ConfiguredInstanceResult> configuredInstanceResult = 
config.getConfiguredInstanceResult(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
ConsumerInterceptor.class,
Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));

List> interceptorList = 
configuredInstanceResult.getInstances().orElse(null);
configuredInstanceResult.maybeThrowWhenAnyConfigurationFailed();{code}
 In terms of impact to the developers, one could argue coding may be required 
in both approaches to get the full benefit of either.  However, I'm open to 
either approach or another approach outside of these.

 

 

  was:
The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
*AbstractConfig.getConfiguredInstances()*  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  *List>* 
interceptors.

This dual respo

[jira] [Updated] (KAFKA-14565) Improve Interceptor Resource Leakage Prevention

2023-01-19 Thread Terry Beard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Beard updated KAFKA-14565:

Description: 
The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
*AbstractConfig.getConfiguredInstances()*  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  *List>* 
interceptors.

This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a runtime exception.  This 
raising of the runtime exception produces a resource leakage in the first 
interceptor as the interceptor container i.e. 
ConsumerInterceptors/ProducerInterceptors is never created and therefore the 
first interceptor's and really any interceptor's close method are never called. 
 

To help ensure the respective container interceptors are able to invoke their 
respective interceptor close methods for proper resource clean up, I propose 
two approaches:

+*PROPOSAL 1*+
Define a default *open* or *configureWithResources()* or *acquireResources()*  
method with no implementation and check exception on the respective 
Consumer/Producer interceptor interfaces.  This method as a part the 
interceptor life cycle management will be responsible for creating threads 
and/or objects which utilizes threads, connections or other resource which 
requires clean up.  Additionally, this default method enables implementation 
optionality as it's empty default behavior means it will do nothing when 
unimplemented mitigating backwards compatibility impact to exiting 
interceptors.  Finally, the Kafka Consumer/Producer Interceptor containers will 
implement a corresponding *maybeOpen* or *maybeConfigureWithResources* or 
*maybeAcquireResources* method which also throws a checked exception. 

See below code excerpt for the Consumer/Producer constructor:
{code:java}
List> interceptorList = (List) 
config.getConfiguredInstances(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
ConsumerInterceptor.class,
Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));

this.interceptors = new ConsumerInterceptors<>(interceptorList);
this.interceptors.maybeConfigureWithResources();
 {code}
+*PROPOSAL 2*+

To avoid changing a public interface and the subsequent KIP process we can add 
a new method on the *AbstractConfig* class called 
*getConfiguredInstanceResult()* which returns a *ConfiguredInstanceResult<* 
*ConsumerInterceptor>* object.  A call to 
*ConfiguredInstanceResult.getConfiguredInstances()* returns an optional list of 
interceptors. 

If an exception occurs during configuration of one or more interceptors, the 
*AbstractConfig.getConfiguredInstanceResult()* will abort, capture the 
exception while maintaining an optional list of any previously created 
interceptors.  A call to *ConfiguredInstanceResult.getException()* returns an 
optional exception at which time the caller can determine how to handle e.g. 
perform clean up.  However, if the caller wants to rethrow the exception where 
existing catch logic can handle, it can call 
*ConfiguredInstanceResult.maybeThrowWhenAnyConfigurationFailed().*  This method 
will do nothing when there is no exception. 

See below code excerpt example for the Consumer/Producer constructor: 
{code:java}
ConfiguredInstanceResult> configuredInstanceResult = 
config.getConfiguredInstanceResult(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
ConsumerInterceptor.class,
Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));

List> interceptorList = 
configuredInstanceResult.getInstances().orElse(null);
configuredInstanceResult.maybeThrowWhenAnyConfigurationFailed();{code}
 In terms of impact to the developers, one could argue coding may be required 
in both approaches to get the full benefit of either.  However, I'm open to 
either approach or another approach outside of these.

 

 

  was:
The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
*AbstractConfig.getConfiguredInstances()*  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  *List>* 
interceptors.

This dual responsi

[jira] [Updated] (KAFKA-14565) Improve Interceptor Resource Leakage Prevention

2023-01-19 Thread Terry Beard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Beard updated KAFKA-14565:

Description: 
The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
*AbstractConfig.getConfiguredInstances()*  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  *List>* 
interceptors.

This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a runtime exception.  This 
raising of the runtime exception produces a resource leakage in the first 
interceptor as the interceptor container i.e. 
ConsumerInterceptors/ProducerInterceptors is never created and therefore the 
first interceptor's and really any interceptor's close method are never called. 
 

To help ensure the respective container interceptors are able to invoke their 
respective interceptor close methods for proper resource clean up, I propose 
two approaches:

+*PROPOSAL 1*+
Define a default *open* or *configureWithResources()* or *acquireResources()*  
method with no implementation and check exception on the respective 
Consumer/Producer interceptor interfaces.  This method as a part the 
interceptor life cycle management will be responsible for creating threads 
and/or objects which utilizes threads, connections or other resource which 
requires clean up.  Additionally, this default method enables implementation 
optionality as it's empty default behavior means it will do nothing when 
unimplemented mitigating backwards compatibility impact to exiting 
interceptors.  Finally, the Kafka Consumer/Producer Interceptor containers will 
implement a corresponding *maybeOpen* or *maybeConfigureWithResources* or 
*maybeAcquireResources* method which also throws a checked exception. 

See below code excerpt for the Consumer/Producer constructor:
{code:java}
List> interceptorList = (List) 
config.getConfiguredInstances(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
ConsumerInterceptor.class,
Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));

this.interceptors = new ConsumerInterceptors<>(interceptorList);
this.interceptors.maybeConfigureWithResources();
 {code}
+*PROPOSAL 2*+

To avoid changing a public interface and the subsequent KIP process we can add 
a new method on the *AbstractConfig* class called 
*getConfiguredInstanceResult()* which returns a *ConfiguredInstanceResult<* 
*ConsumerInterceptor>* object.  A call to 
*ConfiguredInstanceResult.getConfiguredInstances()* returns an optional list of 
interceptors. 

If an exception occurs during configuration of one or more interceptors, the 
*AbstractConfig.getConfiguredInstanceResult()* will abort, capture the 
exception and maintain an optional list of any previously created interceptors. 
 A call to *ConfiguredInstanceResult.getException()* returns an optional 
exception at which time the caller can determine how to handle e.g. perform 
clean up.  However, if the caller wants to rethrow the exception where existing 
catch logic can handle, it can call 
*ConfiguredInstanceResult.maybeThrowWhenAnyConfigurationFailed().*  This method 
will do nothing when there is no exception. 

See below code excerpt example for the Consumer/Producer constructor: 
{code:java}
ConfiguredInstanceResult> configuredInstanceResult = 
config.getConfiguredInstanceResult(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
ConsumerInterceptor.class,
Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));

List> interceptorList = 
configuredInstanceResult.getInstances().orElse(null);
configuredInstanceResult.maybeThrowWhenAnyConfigurationFailed();{code}
 In terms of impact to the developers, one could argue coding may be required 
in both approaches to get the full benefit of either.  However, I'm open to 
either approach or another approach outside of these.

 

 

  was:
The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
*AbstractConfig.getConfiguredInstances()*  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  *List>* 
interceptors.

This dual responsibilit

[jira] [Updated] (KAFKA-14565) Improve Interceptor Resource Leakage Prevention

2023-01-19 Thread Terry Beard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Beard updated KAFKA-14565:

Description: 
The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
*AbstractConfig.getConfiguredInstances()*  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  *List>* 
interceptors.

This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a runtime exception.  This 
raising of the runtime exception produces a resource leakage in the first 
interceptor as the interceptor container i.e. 
ConsumerInterceptors/ProducerInterceptors is never created and therefore the 
first interceptor's and really any interceptor's close method are never called. 
 

To help ensure the respective container interceptors are able to invoke their 
respective interceptor close methods for proper resource clean up, I propose 
two approaches:

+*PROPOSAL 1*+
Define a default *open* or *configureWithResources()* or *acquireResources()*  
method with no implementation and check exception on the respective 
Consumer/Producer interceptor interfaces.  This method as a part the 
interceptor life cycle management will be responsible for creating threads 
and/or objects which utilizes threads, connections or other resource which 
requires clean up.  Additionally, this default method enables implementation 
optionality as it's empty default behavior means it will do nothing when 
unimplemented mitigating backwards compatibility impact to exiting 
interceptors.  Finally, the Kafka Consumer/Producer Interceptor containers will 
implement a corresponding *maybeOpen* or *maybeConfigureWithResources* or 
*maybeAcquireResources* method which also throws a checked exception. 

See below code excerpt for the Consumer/Producer constructor:
{code:java}
List> interceptorList = (List) 
config.getConfiguredInstances(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
ConsumerInterceptor.class,
Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));

this.interceptors = new ConsumerInterceptors<>(interceptorList);
this.interceptors.maybeConfigureWithResources();
 {code}
+*PROPOSAL 2*+

To avoid changing a public interface and the subsequent KIP process we can add 
a new method on the *AbstractConfig* class called 
*getConfiguredInstanceResult()* which returns a *ConfiguredInstanceResult<* 
*ConsumerInterceptor>* **  object.  A call to 
*ConfiguredInstanceResult.getConfiguredInstances()* returns an optional list of 
interceptors. 

If an exception occurs during configuration of one or more interceptors, the 
*AbstractConfig.getConfiguredInstanceResult()* will abort, capture the 
exception and maintain an optional list of any previously created interceptors. 
 A call to *ConfiguredInstanceResult.getException()* returns an optional 
exception at which time the caller can determine how to handle e.g. perform 
clean up.  However, if the caller wants to rethrow the exception where existing 
catch logic can handle, it can call 
*ConfiguredInstanceResult.maybeThrowWhenAnyConfigurationFailed().*  This method 
will do nothing when there is no exception. 

See below code excerpt example for the Consumer/Producer constructor: 
{code:java}
ConfiguredInstanceResult> configuredInstanceResult = 
config.getConfiguredInstanceResult(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
ConsumerInterceptor.class,
Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));

List> interceptorList = 
configuredInstanceResult.getInstances().orElse(null);
configuredInstanceResult.maybeThrowWhenAnyConfigurationFailed();{code}
 In terms of impact to the developers, one could argue coding may be required 
in both approaches to get the full benefit of either.  However, I'm open to 
either approach or another approach outside of these.

 

 

  was:
The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
*AbstractConfig.getConfiguredInstances()*  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  *List>* 
interceptors.

This dual responsib

[jira] [Updated] (KAFKA-14565) Improve Interceptor Resource Leakage Prevention

2023-01-18 Thread Terry Beard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Beard updated KAFKA-14565:

Description: 
The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
*AbstractConfig.getConfiguredInstances()*  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  *List>* 
interceptors.

This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a runtime exception.  This 
raising of the runtime exception produces a resource leakage in the first 
interceptor as the interceptor container i.e. 
ConsumerInterceptors/ProducerInterceptors is never created and therefore the 
first interceptor's and really any interceptor's close method are never called. 
 

To help ensure the respective container interceptors are able to invoke their 
respective interceptor close methods for proper resource clean up, I propose 
two approaches:

+*PROPOSAL 1*+
Define a default *open* or *configureWithResources()* or *acquireResources()*  
method with no implementation and check exception on the respective 
Consumer/Producer interceptor interfaces.  This method as a part the 
interceptor life cycle management will be responsible for creating threads 
and/or objects which utilizes threads, connections or other resource which 
requires clean up.  Additionally, this default method enables implementation 
optionality as it's empty default behavior means it will do nothing when 
unimplemented mitigating backwards compatibility impact to exiting 
interceptors.  Finally, the Kafka Consumer/Producer Interceptor containers will 
implement a corresponding *maybeOpen* or *maybeConfigureWithResources* or 
*maybeAcquireResources* method which also throws a checked exception. 

See below code excerpt for the Consumer/Producer constructor:
{code:java}
List> interceptorList = (List) 
config.getConfiguredInstances(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
ConsumerInterceptor.class,
Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));

this.interceptors = new ConsumerInterceptors<>(interceptorList);
this.interceptors.maybeConfigureWithResources();
 {code}
+*PROPOSAL 2*+

To avoid changing a public interface and the subsequent KIP process we can 
replace the *AbstractConfig.getConfiguredInstances()*  with a new factory class 
which wraps the *AbstractConfig* while introducing a replacement method for 
*getConfiguredInstances* e.g.  
{*}interceptorLoader.loadConfiguredInstances(){*}.  This approach enables reuse 
of the existing *AbstractConfig*  methods along side the new and improved 
factory method.  It also enables reuse of the existing try/catch interceptor 
*close()* method clean up behavior within the respective clients in the event 
of a configuration failure.  

See below code excerpt for the Consumer/Producer constructor: 
{code:java}
Loader interceptorLoader = new Loader(config);
LoadConfiguredInstanceResult loadConfiguredInstanceResult = 
interceptorLoader.loadConfiguredInstances(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
ConsumerInterceptor.class,
Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));

List> interceptorList = 
loadConfiguredInstanceResult.getInstances();
loadConfiguredInstanceResult.throwWhenAnyConfigurationFailed();{code}

 In terms of impact to the developers, one could argue coding may be required 
in both approaches to get the full benefit of either.  However, I'm open to 
either approach or another approach outside of these.

 

 

  was:
The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
*AbstractConfig.getConfiguredInstances()*  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  *List>* 
interceptors.

This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a runtime excep

[jira] [Updated] (KAFKA-14565) Improve Interceptor Resource Leakage Prevention

2023-01-18 Thread Terry Beard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Beard updated KAFKA-14565:

Description: 
The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
*AbstractConfig.getConfiguredInstances()*  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  *List>* 
interceptors.

This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a runtime exception.  This 
raising of the runtime exception produces a resource leakage in the first 
interceptor as the interceptor container i.e. 
ConsumerInterceptors/ProducerInterceptors is never created and therefore the 
first interceptor's and really any interceptor's close method are never called. 
 

To help ensure the respective container interceptors are able to invoke their 
respective interceptor close methods for proper resource clean up, I propose 
two approaches:

+*PROPOSAL 1*+
Define a default *open* or *configureWithResources()* or *acquireResources()*  
method with no implementation and check exception on the respective 
Consumer/Producer interceptor interfaces.  This method as a part the 
interceptor life cycle management will be responsible for creating threads 
and/or objects which utilizes threads, connections or other resource which 
requires clean up.  Additionally, this default method enables implementation 
optionality as it's empty default behavior means it will do nothing when 
unimplemented mitigating backwards compatibility impact to exiting 
interceptors.  Finally, the Kafka Consumer/Producer Interceptor containers will 
implement a corresponding *maybeOpen* or *maybeConfigureWithResources* or 
*maybeAcquireResources* method which also throws a checked exception. 

See below code excerpt for the Consumer/Producer constructor:
{code:java}
List> interceptorList = (List) 
config.getConfiguredInstances(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
ConsumerInterceptor.class,
Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));

this.interceptors = new ConsumerInterceptors<>(interceptorList);
this.interceptors.maybeConfigureWithResources();
 {code}
+*PROPOSAL 2*+

To avoid changing a public interface and the subsequent KIP process we can 
replace the *AbstractConfig.getConfiguredInstances()*  with a new factory class 
which wraps the *AbstractConfig* while introducing a replacement method for 
*getConfiguredInstances* e.g.  
{*}interceptorLoader.loadConfiguredInstances(){*}.  This approach enables reuse 
of the existing *AbstractConfig*  methods along side the new and improved 
factory method.  It also enables reuse of the existing try/catch interceptor 
*close()* method clean up behavior within the respective clients in the event 
of a configuration failure.  See below example code.

 
{color:#de350b}{color}
{code:java}
Loader interceptorLoader = new Loader(config);
LoadConfiguredInstanceResult loadConfiguredInstanceResult = 
interceptorLoader.loadConfiguredInstances(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
ConsumerInterceptor.class,
Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));

List> interceptorList = 
loadConfiguredInstanceResult.getInstances();
loadConfiguredInstanceResult.throwWhenAnyConfigurationFailed();{code}
{color:#de350b}{color}
 

In terms of impact to the developers, one could argue coding may be required in 
both approaches to get the full benefit of either.  However, I'm open to either 
approach or another approach outside of these.

 

 

  was:
The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
*AbstractConfig.getConfiguredInstances()*  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  *List>* 
interceptors.

This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a runt

[jira] [Updated] (KAFKA-14565) Improve Interceptor Resource Leakage Prevention

2023-01-18 Thread Terry Beard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Beard updated KAFKA-14565:

Description: 
The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
*AbstractConfig.getConfiguredInstances()*  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  *List>* 
interceptors.

This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a runtime exception.  This 
raising of the runtime exception produces a resource leakage in the first 
interceptor as the interceptor container i.e. 
ConsumerInterceptors/ProducerInterceptors is never created and therefore the 
first interceptor's and really any interceptor's close method are never called. 
 

To help ensure the respective container interceptors are able to invoke their 
respective interceptor close methods for proper resource clean up, I propose 
two approaches:

+*PROPOSAL 1*+
Define a default *open* or *configureWithResources()* or *acquireResources()*  
method with no implementation and check exception on the respective 
Consumer/Producer interceptor interfaces.  This method as a part the 
interceptor life cycle management will be responsible for creating threads 
and/or objects which utilizes threads, connections or other resource which 
requires clean up.  Additionally, this default method enables implementation 
optionality as it's empty default behavior means it will do nothing when 
unimplemented mitigating backwards compatibility impact to exiting 
interceptors.  Finally, the Kafka Consumer/Producer Interceptor containers will 
implement a corresponding *maybeOpen* or *maybeConfigureWithResources* or 
*maybeAcquireResources* method which also throws a checked exception. 

See below code excerpt for the Consumer/Producer constructor:
{code:java}
List> interceptorList = (List) 
config.getConfiguredInstances(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
ConsumerInterceptor.class,
Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));

this.interceptors = new ConsumerInterceptors<>(interceptorList);
this.interceptors.maybeConfigureWithResources();
 {code}
+*PROPOSAL 2*+

To avoid changing a public interface and the subsequent KIP process we can 
replace the *AbstractConfig.getConfiguredInstances()*  with a new factory class 
which wraps the *AbstractConfig* while introducing a replacement method for 
*getConfiguredInstances* e.g.  
{*}interceptorLoader.loadConfiguredInstances(){*}.  This approach enables reuse 
of the existing *AbstractConfig*  methods along side the new and improved 
factory method.  It also enables reuse of the existing try/catch interceptor 
*close()* method clean up behavior within the respective clients in the event 
of a configuration failure.  See below example code.

 
{color:#de350b}Loader interceptorLoader = new Loader(config);
LoadConfiguredInstanceResult loadConfiguredInstanceResult = 
interceptorLoader.loadConfiguredInstances(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
ConsumerInterceptor.class,
Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, 
clientId));

List> interceptorList = 
loadConfiguredInstanceResult.getInstances();
loadConfiguredInstanceResult.throwWhenAnyConfigurationFailed();{color}
 

In terms of impact to the developers, one could argue coding may be required in 
both approaches to get the full benefit of either.  However, I'm open to either 
approach or another approach outside of these.

 

 

  was:
The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
AbstractConfig.getConfiguredInstances()  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  List> interceptors.

This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a ru

[jira] [Updated] (KAFKA-14565) Improve Interceptor Resource Leakage Prevention

2023-01-18 Thread Terry Beard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Beard updated KAFKA-14565:

Description: 
The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
AbstractConfig.getConfiguredInstances()  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  List> interceptors.

This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a runtime exception.  This 
raising of the runtime exception produces a resource leakage in the first 
interceptor as the interceptor container i.e. 
ConsumerInterceptors/ProducerInterceptors is never created and therefore the 
first interceptor's and really any interceptor's close method are never called. 
 

To help ensure the respective container interceptors are able to invoke their 
respective interceptor close methods for proper resource clean up, I propose 
defining a default open or configureWithResources() or acquireResources()  
method with no implementation and check exception on the respective 
Consumer/Producer interceptor interfaces.  This method will be responsible for 
creating threads and/or objects which utilizes threads, connections or other 
resource which requires clean up.  Additionally, the default method enables 
implementation optionality as it's empty default behavior means it will do 
nothing when unimplemented mitigating backwards compatibility impact to exiting 
interceptors.

Additionally, the Kafka Consumer/Producer Interceptor containers will implement 
a corresponding maybeOpen or maybeConfigureWithResources or 
maybeAcquireResources method which also throws a checked exception.  

Alternatively, to avoid changing a public interfaces and the subsequent KIP 
process, the AbstractConfig.getConfiguredInstances()  can be replaced with a 
new factory which wraps the AbstractConfig while introducing a new factory 
method which addresses the problem.  

 

 

  was:
The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
AbstractConfig.getConfiguredInstances()  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  List> interceptors.

This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a runtime exception.  This 
raising of the runtime exception produces a resource leakage in the first 
interceptor as the interceptor container i.e. 
ConsumerInterceptors/ProducerInterceptors is never created and therefore the 
first interceptor's and really any interceptor's close method are never called. 
 

To help ensure the respective container interceptors are able to invoke their 
respective interceptor close methods for proper resource clean up, I propose 
defining a default open method with no implementation and check exception on 
the respective Consumer/Producer interceptor interfaces.  This open method will 
be responsible for creating threads and/or objects which utilizes threads, 
connections or other resource which requires clean up.  Additionally, the 
default open method enables implementation optionality as it's empty default 
behavior means it will do nothing when unimplemented.  

Additionally, the Kafka Consumer/Producer Interceptor containers will implement 
a corresponding maybeOpen method which throws a checked exception.  In order to 
maintain backwards compatibility with earlier developed interceptors the 
maybeOpen will check whether the interceptor's interface contains the newer 
open method before calling it accordingly.   


> Improve Interceptor Resource Leakage Prevention
> ---
>
> Key: KAFKA-14565
> URL: https://issues.apache.org/jira/browse/KAFKA-14565
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Terry B

[jira] [Updated] (KAFKA-14565) Improve Interceptor Resource Leakage Prevention

2023-01-10 Thread Terry Beard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Beard updated KAFKA-14565:

Labels: needs-kip  (was: )

> Improve Interceptor Resource Leakage Prevention
> ---
>
> Key: KAFKA-14565
> URL: https://issues.apache.org/jira/browse/KAFKA-14565
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Terry Beard
>Assignee: Terry Beard
>Priority: Major
>  Labels: needs-kip
> Fix For: 3.5.0
>
>
> The Consumer and Producer interceptor interfaces and their corresponding 
> Kafka Consumer and Producer constructors do not adequately support cleanup of 
> underlying interceptor resources. 
> Currently within the Kafka Consumer and Kafka Producer constructors,  the 
> AbstractConfig.getConfiguredInstances()  is delegated responsibility for both 
> creating and configuring each interceptor listed in the interceptor.classes 
> property and returns a configured  List> 
> interceptors.
> This dual responsibility for both creation and configuration is problematic 
> when it involves multiple interceptors where at least one interceptor's 
> configure method implementation creates and/or depends on objects which 
> creates threads, connections or other resources which requires clean up and 
> the subsequent interceptor's configure method raises a runtime exception.  
> This raising of the runtime exception produces a resource leakage in the 
> first interceptor as the interceptor container i.e. 
> ConsumerInterceptors/ProducerInterceptors is never created and therefore the 
> first interceptor's and really any interceptor's close method are never 
> called.  
> To help ensure the respective container interceptors are able to invoke their 
> respective interceptor close methods for proper resource clean up, I propose 
> defining a default open method with no implementation and check exception on 
> the respective Consumer/Producer interceptor interfaces.  This open method 
> will be responsible for creating threads and/or objects which utilizes 
> threads, connections or other resource which requires clean up.  
> Additionally, the default open method enables implementation optionality as 
> it's empty default behavior means it will do nothing when unimplemented.  
> Additionally, the Kafka Consumer/Producer Interceptor containers will 
> implement a corresponding maybeOpen method which throws a checked exception.  
> In order to maintain backwards compatibility with earlier developed 
> interceptors the maybeOpen will check whether the interceptor's interface 
> contains the newer open method before calling it accordingly.   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14565) Improve Interceptor Resource Leakage Prevention

2023-01-09 Thread Terry Beard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Beard updated KAFKA-14565:

Summary: Improve Interceptor Resource Leakage Prevention  (was: Improving 
Interceptor Resource Leakage Prevention)

> Improve Interceptor Resource Leakage Prevention
> ---
>
> Key: KAFKA-14565
> URL: https://issues.apache.org/jira/browse/KAFKA-14565
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Terry Beard
>Assignee: Terry Beard
>Priority: Major
> Fix For: 3.5.0
>
>
> The Consumer and Producer interceptor interfaces and their corresponding 
> Kafka Consumer and Producer constructors do not adequately support cleanup of 
> underlying interceptor resources. 
> Currently within the Kafka Consumer and Kafka Producer constructors,  the 
> AbstractConfig.getConfiguredInstances()  is delegated responsibility for both 
> creating and configuring each interceptor listed in the interceptor.classes 
> property and returns a configured  List> 
> interceptors.
> This dual responsibility for both creation and configuration is problematic 
> when it involves multiple interceptors where at least one interceptor's 
> configure method implementation creates and/or depends on objects which 
> creates threads, connections or other resources which requires clean up and 
> the subsequent interceptor's configure method raises a runtime exception.  
> This raising of the runtime exception produces a resource leakage in the 
> first interceptor as the interceptor container i.e. 
> ConsumerInterceptors/ProducerInterceptors is never created and therefore the 
> first interceptor's and really any interceptor's close method are never 
> called.  
> To help ensure the respective container interceptors are able to invoke their 
> respective interceptor close methods for proper resource clean up, I propose 
> defining a default open method with no implementation and check exception on 
> the respective Consumer/Producer interceptor interfaces.  This open method 
> will be responsible for creating threads and/or objects which utilizes 
> threads, connections or other resource which requires clean up.  
> Additionally, the default open method enables implementation optionality as 
> it's empty default behavior means it will do nothing when unimplemented.  
> Additionally, the Kafka Consumer/Producer Interceptor containers will 
> implement a corresponding maybeOpen method which throws a checked exception.  
> In order to maintain backwards compatibility with earlier developed 
> interceptors the maybeOpen will check whether the interceptor's interface 
> contains the newer open method before calling it accordingly.   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)