[ 
https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17676359#comment-17676359
 ] 

Terry Beard edited comment on KAFKA-14565 at 1/13/23 12:12 AM:
---------------------------------------------------------------

[~ChrisEgerton]  I understood you the first time.  However, my response to both 
your suggestions was not so clear.  So when you suggested altering 
getConfiguredInstances, my immediate thought was that it conflicts with 
Open/Close Principle (OCP).  Likewise, you suggested  duplicating logic from 
the  getConfiguredInstances into the various clients with an ability to loop 
through interceptors, calling their respective close method in the event of an 
exception, my thought was that it conflicts with the Don't Repeat Yourself 
(DRY) principle as both Consumer/Producer constructors already call their 
respective interceptors close method via the catch block.  Also, 
getConfiguredInstances already performs the create/configure.  It's only 
problem is that it's so abstract/generic that it does not make assumptions 
regarding exceptions leaving it up to the caller to handle.  

NOTE:  The noteable exception (pun intended) being not expecting configure 
implementions  to acquire I/O or thread resources.  

Finally,  IMO, Consumer/Producer interceptor constructors are currently 
saturated with logic.  :P

Regarding your other points:
{quote}IMO the proposed changes to the interceptor interfaces aren't 
significantly clearer to developers and are really only convenient as a 
workaround to issues with the AbstractConfig API. 
{quote}
Yes, my approach may look like a workaround but I think that is because of my 
OCP decisions which eventually led to the addition of the default open() method 
on the interceptor interfaces.  Although we could change terse open() to 
something like configureWithResources() or acquireResources() etc. to make the 
intentions clearer.
{quote}And to reiterate a point raised above, they will require changes to 
existing interceptor classes in order to provide any benefit to users, which 
means that every existing interceptor release out there would still cause 
leaked resources in the failure scenario described by this issue.
{quote}
I agree that anyone with a design requirement for accessing I/O resource, 
thread etc. will have to change their code to get the benefit.  But IMO, this 
should be consider a feature versus a bug fix as the designers of the current 
client/interceptor may not have envisioned my use case.   But without any 
history, notes, etc. to get inside their heads, I could be way off.

Also, anecdotally speaking we've had roughly six years of interceptor 
development and AFAIK no one has reported running into this issue.  Now I 
suspect if anyone did, they likely developed a workaround such as myself.  But 
even if we followed your KIPless approach, developers could opt to eventually 
refactor their workaround to fully enjoy the benefits of your approach minus 
their bloated workaround.  At least I would.  :)

 

But......... in the spirit of your suggestion, what if we replaced the 
Abstract.getConfiguredInstances with a custom yet to be developed 
InterceptorLoader object?  See below example:

 

 
{code:java}
LoadConfiguredInstanceResult loadConfiguredInstanceResult = 
InterceptorLoader.loadConfiguredInstances(
                ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
                Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, 
clientId));
List<ConsumerInterceptor<K, V>> interceptorList = 
loadConfiguredInstanceResult.getInstances();
loadConfiguredInstanceResult.throwExceptionWhenAnyConfigurationFails();
{code}
 

 

Lastly, the one thing I do not like about the current interceptor interface is 
that in implementing the Configurable interface's configure method, I have to 
wrap exceptions within a RuntimeException which is a pain when 
reading/splunking log files.  Where as the 
configureWithResources()/acquireResources etc. defines a checked Exception 
which can be more specific.  

 

 

 

 

 

 

 


was (Author: JIRAUSER298607):
[~ChrisEgerton]  I understood you the first time.  However, my response to both 
your suggestions was not so clear.  So when you suggested altering 
getConfiguredInstances, my immediate thought was that it conflicts with 
Open/Close Principle (OCP).  Likewise, you suggested  duplicating logic from 
the  getConfiguredInstances into the various clients with an ability to loop 
through interceptors, calling their respective close method in the event of an 
exception, my thought was that it conflicts with the Don't Repeat Yourself 
(DRY) principle as both Consumer/Producer constructors already call their 
respective interceptors close method via the catch block.  Also, 
getConfiguredInstances already performs the create/configure.  It's only 
problem is that it's so abstract/generic that it does not make assumptions 
regarding exceptions leaving it up to the caller to handle.  

NOTE:  The noteable exception (pun intended) being not expecting configure 
implementions  to acquire I/O or thread resources.  

Finally,  IMO, Consumer/Producer interceptor constructors are currently 
saturated with logic.  :P

Regarding your other points:
{quote}IMO the proposed changes to the interceptor interfaces aren't 
significantly clearer to developers and are really only convenient as a 
workaround to issues with the AbstractConfig API. 
{quote}
Yes, my approach may look like a workaround but I think that is because of my 
OCP decisions which eventually led to the addition of the default open() method 
on the interceptor interfaces.  Although we could change terse open() to 
something like configureWithResources() or acquireResources() etc. to make the 
intentions clearer.
{quote}And to reiterate a point raised above, they will require changes to 
existing interceptor classes in order to provide any benefit to users, which 
means that every existing interceptor release out there would still cause 
leaked resources in the failure scenario described by this issue.
{quote}
I agree that anyone with a design requirement for accessing I/O resource, 
thread etc. will have to change their code to get the benefit.  But IMO, this 
should be consider a feature versus a bug fix as the designers of the current 
client/interceptor may not have envisioned my use case.   Of course, I'm given 
them the benefit of the doubt, hence the KIP route.

Also, anecdotally speaking we've had roughly six years of interceptor 
development and AFAIK no one has reported running into this issue.  Now I 
suspect if anyone did, they likely developed a workaround such as myself.  But 
even if we followed your KIPless approach, developers could opt to eventually 
refactor their workaround to fully enjoy the benefits of your approach minus 
their bloated workaround.  At least I would.  :)

 

But......... in the spirit of your suggestion, what if we replaced the 
Abstract.getConfiguredInstances with a custom yet to be developed 
InterceptorLoader object?  See below example:

 

 
{code:java}
LoadConfiguredInstanceResult loadConfiguredInstanceResult = 
InterceptorLoader.loadConfiguredInstances(
                ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
                Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, 
clientId));
List<ConsumerInterceptor<K, V>> interceptorList = 
loadConfiguredInstanceResult.getInstances();
loadConfiguredInstanceResult.throwExceptionWhenAnyConfigurationFails();
{code}
 

 

Lastly, the one thing I do not like about the current interceptor interface is 
that in implementing the Configurable interface's configure method, I have to 
wrap exceptions within a RuntimeException which is a pain when 
reading/splunking log files.  Where as the 
configureWithResources()/acquireResources etc. defines a checked Exception 
which can be more specific.  

 

 

 

 

 

 

 

> Improve Interceptor Resource Leakage Prevention
> -----------------------------------------------
>
>                 Key: KAFKA-14565
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14565
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>            Reporter: Terry Beard
>            Assignee: Terry Beard
>            Priority: Major
>              Labels: needs-kip
>             Fix For: 3.5.0
>
>
> The Consumer and Producer interceptor interfaces and their corresponding 
> Kafka Consumer and Producer constructors do not adequately support cleanup of 
> underlying interceptor resources. 
> Currently within the Kafka Consumer and Kafka Producer constructors,  the 
> AbstractConfig.getConfiguredInstances()  is delegated responsibility for both 
> creating and configuring each interceptor listed in the interceptor.classes 
> property and returns a configured  List<ConsumerInterceptor<K,V>> 
> interceptors.
> This dual responsibility for both creation and configuration is problematic 
> when it involves multiple interceptors where at least one interceptor's 
> configure method implementation creates and/or depends on objects which 
> creates threads, connections or other resources which requires clean up and 
> the subsequent interceptor's configure method raises a runtime exception.  
> This raising of the runtime exception produces a resource leakage in the 
> first interceptor as the interceptor container i.e. 
> ConsumerInterceptors/ProducerInterceptors is never created and therefore the 
> first interceptor's and really any interceptor's close method are never 
> called.  
> To help ensure the respective container interceptors are able to invoke their 
> respective interceptor close methods for proper resource clean up, I propose 
> defining a default open method with no implementation and check exception on 
> the respective Consumer/Producer interceptor interfaces.  This open method 
> will be responsible for creating threads and/or objects which utilizes 
> threads, connections or other resource which requires clean up.  
> Additionally, the default open method enables implementation optionality as 
> it's empty default behavior means it will do nothing when unimplemented.  
> Additionally, the Kafka Consumer/Producer Interceptor containers will 
> implement a corresponding maybeOpen method which throws a checked exception.  
> In order to maintain backwards compatibility with earlier developed 
> interceptors the maybeOpen will check whether the interceptor's interface 
> contains the newer open method before calling it accordingly.   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to