> Add A No Implementation Default Open Method To Consumer and Producer 
> Interceptor Interfaces
> -------------------------------------------------------------------------------------------
>                 Key: KAFKA-14565
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14565
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>            Reporter: Terry Beard
>            Assignee: Terry Beard
>            Priority: Major
> The Consumer and Producer interceptor interfaces and their corresponding 
> Kafka Consumer and Producer constructors do not adequately support cleanup of 
> underlying interceptor resources. 
> Currently within the Kafka Consumer and Kafka Producer constructors,  the 
> AbstractConfig.getConfiguredInstances()  is delagated responsibilty for both 
> creating and configuring each interceptor listed in the interceptor.classes 
> property and returns a configured  List<ConsumerInterceptor<K,V>> 
> interceptors.
> h2. Kafka Consumer Constructor
> {code:java}
> try {
> ....
> List<ConsumerInterceptor<K, V>> interceptorList = (List) 
> config.getConfiguredInstances(
>         ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
>         ConsumerInterceptor.class,
>         Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
>  {code}
> h2. Kafka Producer Constructor
> {code:java}
> try {
> ....
> List<ProducerInterceptor<K, V>> interceptorList = (List) 
> config.getConfiguredInstances(
>         ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
>         ProducerInterceptor.class,
>         Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
>  {code}
> This dual responsibility for both creation and configuration is problematic 
> when it involves multiple interceptors where at least one interceptor's 
> configure method implementation creates and/or depends on objects which 
> creates threads, connections or other resources which requires clean up and 
> the subsequent interceptor's configure method raises a runtime exception.  
> This raising of the runtime exception results produces a resource leakage in 
> the first interceptor as the interceptor container i.e. 
> ConsumerInterceptors/ProducerInterceptors are never created and therefore the 
> first interceptor's and really any interceptor's close method are never 
> called.  
> h2. KafkaConsumer Constructor
> {code:java}
> try {
> ....
> List<ConsumerInterceptor<K, V>> interceptorList = (List) 
> config.getConfiguredInstances(
>         ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
>         ConsumerInterceptor.class,
>         Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
> .... {code}
> If the above line results in a runtime exception, the below this.interceptors 
> is never created. 
> {code:java}
> this.interceptors = new ConsumerInterceptors<>(interceptorList); {code}
> h2. Kafka Producer{color:#172b4d} Constructor{color}
> {code:java}
> try {
> ....
> List<ProducerInterceptor<K, V>> interceptorList = (List) 
> config.getConfiguredInstances(
>         ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
>         ProducerInterceptor.class,
>         Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)); 
> {code}
> If the above line results in a runtime exception, the below this.interceptors 
> is never created. 
> {code:java}
> if (interceptors != null)
>     this.interceptors = interceptors;
> else
>     this.interceptors = new ProducerInterceptors<>(interceptorList);
> .... {code}
> Although, both Kafka Consumer and Kafka Producer constructors try/catch 
> implement  close for resource clean up, 
> {code:java}
> ...
> catch (Throwable t) {
>     // call close methods if internal objects are already constructed; this 
> is to prevent resource leak. see KAFKA-2121
>     // we do not need to call `close` at all when `log` is null, which means 
> no internal objects were initialized.
>     if (this.log != null) {
>         close(0, true);
>     }
>     // now propagate the exception
>     throw new KafkaException("Failed to construct kafka consumer", t);
> } {code}
> their respective close implementation located in the catch above never calls 
> the respective container interceptor close method below as the 
> {color:#172b4d}*this{color}.{color:#403294}interceptors{color}* was never 
> created.
> {code:java}
> private void close(long timeoutMs, boolean swallowException) {
>  ....  
> Utils.closeQuietly(interceptors, "consumer interceptors", firstException);
>   .... {code}
> This problem is magnified within a webserver cluster i.e. Confluent's REST 
> Proxy server where thousands of requests containing interceptor configuration 
> failures can occur in seconds resulting in an inadvertent DDoS attack as 
> cluster resources are quickly exhausted, disrupting all service activities.   
> To help ensure the respective container interceptors are able to invoke their 
> respective interceptor close methods for proper resource clean up, I propose 
> defining a default open method with no implementation and a check exception 
> on the respective Consumer/Producer interceptor interfaces.  This open method 
> will be responsible for creating threads and/or objects which utilizes 
> threads, connections or other resource which requires clean up.  
> Additionally, the default open method enables implementation optionality as 
> it's empty default behavior means it will do nothing on unimplemented classes 
> of this interceptor interface.  
> {code:java}
> package org.apache.kafka.clients.consumer;
> import org.apache.kafka.common.Configurable;
> import org.apache.kafka.common.TopicPartition;
> import java.util.Map;
> public interface ConsumerInterceptor<K, V> extends Configurable, 
> AutoCloseable {    
>     ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);    
>     void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);    
>     default void open() throws Exception {};
>     void close();
> }
>  {code}
> {code:java}
> package org.apache.kafka.clients.producer;
> import org.apache.kafka.common.Configurable;
> public interface ProducerInterceptor<K, V> extends Configurable, 
> AutoCloseable {
>     ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);    
>     void onAcknowledgement(RecordMetadata metadata, Exception exception);
>     default void open() throws Exception {};
>     void close();
> }
>  {code}
> {color:#172b4d}Additionally, the Kafka Consumer/Producer Interceptor 
> containers will implement a corresponding maybeOpen method which throws a 
> checked Exception.  It's called maybeOpen for backwards compatibility purpose 
> as it must  determine whether an interceptor's interface contains the newer 
> open method before calling it accordingly.   {color}
> {color:#172b4d}{*}NOTE{*}: Developers are encouraged to throw a more specific 
> exception.{color}
> {code:java}
> package org.apache.kafka.clients.consumer.internals;
> import org.apache.kafka.clients.consumer.ConsumerInterceptor;
> import org.apache.kafka.clients.consumer.ConsumerRecords;
> import org.apache.kafka.clients.consumer.OffsetAndMetadata;
> import org.apache.kafka.clients.producer.ProducerInterceptor;
> import org.apache.kafka.common.TopicPartition;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;import java.io.Closeable;
> import java.util.List;
> import java.util.Arrays;
> import java.util.Map;/**
> public class ConsumerInterceptors<K, V> implements Closeable {
>     private static final Logger log = 
> LoggerFactory.getLogger(ConsumerInterceptors.class);
>     private final List<ConsumerInterceptor<K, V>> interceptors;    
> public ConsumerInterceptors(List<ConsumerInterceptor<K, V>> interceptors) {
>         this.interceptors = interceptors;
>     }    
>     public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
>         ConsumerRecords<K, V> interceptRecords = records;
>         for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
>             try {
>                 interceptRecords = interceptor.onConsume(interceptRecords);
>             } catch (Exception e) {
>                 // do not propagate interceptor exception, log and continue 
> calling other interceptors
>                 log.warn("Error executing interceptor onConsume callback", e);
>             }
>         }
>         return interceptRecords;
>     }    
>     public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
>         for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
>             try {
>                 interceptor.onCommit(offsets);
>             } catch (Exception e) {
>                 // do not propagate interceptor exception, just log
>                 log.warn("Error executing interceptor onCommit callback", e);
>             }
>         }
>     }    
>     @Override
>     public void close() {
>         for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
>             try {
>                 interceptor.close();
>             } catch (Exception e) {
>                 log.error("Failed to close consumer interceptor ", e);
>             }
>         }
>     }    public List<ConsumerInterceptor<K, V>> getInterceptors() {
>         return interceptors;
>     }    
>      /**
>      * Only interceptors which implement {@link ConsumerInterceptor#open()} 
> are   called by the container.  This is for backwards
>      * compatibility as older interceptors do not contain the default open()
>      * */
>     public void maybeOpen() throws Exception {
>         for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
>             try {
> if(Arrays.stream(interceptor.getClass().getMethods()).anyMatch(method -> 
> method.getName() == "open")){
>                     interceptor.open();
>                 }
>              } catch (Exception e) {
>                 log.error("Failed to open consumer interceptor ", e);
>                 throw e;
>             }
>         }
>     }
> } {code}
> {code:java}
> package org.apache.kafka.clients.producer.internals;
> import org.apache.kafka.clients.producer.ProducerInterceptor;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.clients.producer.RecordMetadata;
> import org.apache.kafka.common.TopicPartition;
> import org.apache.kafka.common.record.RecordBatch;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;import java.io.Closeable;
> import java.util.Arrays;
> import java.util.List;/**
> public class ProducerInterceptors<K, V> implements Closeable {
>     private static final Logger log = 
> LoggerFactory.getLogger(ProducerInterceptors.class);
>     private final List<ProducerInterceptor<K, V>> interceptors;    
> public ProducerInterceptors(List<ProducerInterceptor<K, V>> interceptors) {
>         this.interceptors = interceptors;
>     }
> public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
>         ProducerRecord<K, V> interceptRecord = record;
>         for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
>             try {
>                 interceptRecord = interceptor.onSend(interceptRecord);
>             } catch (Exception e) {
>                 // do not propagate interceptor exception, log and continue 
> calling other interceptors
>                 // be careful not to throw exception from here
>                 if (record != null)
>                     log.warn("Error executing interceptor onSend callback for 
> topic: {}, partition: {}", record.topic(), record.partition(), e);
>                 else
>                     log.warn("Error executing interceptor onSend callback", 
> e);
>             }
>         }
>         return interceptRecord;
>     }    
> public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
>         for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
>             try {
>                 interceptor.onAcknowledgement(metadata, exception);
>             } catch (Exception e) {
>                 // do not propagate interceptor exceptions, just log
>                 log.warn("Error executing interceptor onAcknowledgement 
> callback", e);
>             }
>         }
>     }    
>  public void onSendError(ProducerRecord<K, V> record, TopicPartition 
> interceptTopicPartition, Exception exception) {
>         for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
>             try {
>                 if (record == null && interceptTopicPartition == null) {
>                     interceptor.onAcknowledgement(null, exception);
>                 } else {
>                     if (interceptTopicPartition == null) {
>                         interceptTopicPartition = 
> extractTopicPartition(record);
>                     }
>                     interceptor.onAcknowledgement(new 
> RecordMetadata(interceptTopicPartition, -1, -1,
>                                     RecordBatch.NO_TIMESTAMP, -1, -1), 
> exception);
>                 }
>             } catch (Exception e) {
>                 // do not propagate interceptor exceptions, just log
>                 log.warn("Error executing interceptor onAcknowledgement 
> callback", e);
>             }
>         }
>     }    
>   public static <K, V> TopicPartition extractTopicPartition(ProducerRecord<K, 
> V> record) {
>         return new TopicPartition(record.topic(), record.partition() == null 
> ? RecordMetadata.UNKNOWN_PARTITION : record.partition());
>     }
>     @Override
>     public void close() {
>         for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
>             try {
>                 interceptor.close();
>             } catch (Exception e) {
>                 log.error("Failed to close producer interceptor ", e);
>             }
>         }
>     }    
>     /**
>      * Only interceptors which implement {@link ProducerInterceptor#open()} 
> are called by the container.  This is for backwards
>      * compatibility as older interceptors do not contain the default open()
>      * */
>     public void maybeOpen() throws Exception {
>         for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
>             try {
> if(Arrays.stream(interceptor.getClass().getMethods()).anyMatch(method -> 
> method.getName() == "open")){
>                     interceptor.open();
>                 }
>             } catch (Exception e) {
>                 log.error("Failed to open producer interceptor ", e);
>                 throw e;
>             }
>         }
>     }
> }
>  {code}
> In summary, the overall workflow is that after the configured interceptor 
> instances are returned by the AbstractConfig.getConfiguredInstances(),  the 
> Kafka Consumer/Producer constructor's respective interceptor container 
> maybeOpen method will be called.  
> If in the maybeOpen call, an exception occurs following the interceptor open 
> method call, the respective client constructor's try/catch will call the 
> interceptor container's close method which in-turn loops through and calls 
> each interceptor's close method for clean up of resources allocated in the 
> interceptor open method.
> If an exception occurs in the configure method all objects will be garbage 
> collected as this method must no longer be used for creating threads and/or 
> objects which utilizes threads, connections or other resources which requires 
> clean up.  
> h2. Kafka Consumer Constructor maybeOpen example
> {code:java}
> ...
>         List<ConsumerInterceptor<K, V>> interceptorList = (List) 
> config.getConfiguredInstances(
>                 ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
>                 ConsumerInterceptor.class,
>                 Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, 
> clientId));        this.interceptors = new 
> ConsumerInterceptors<>(interceptorList);
>         this.interceptors.maybeOpen();
> ...{code}
> h2. Kafka Producer {color:#172b4d}maybeOpen{color} example
> {code:java}
> ...        
> List<ProducerInterceptor<K, V>> interceptorList = (List) 
> config.getConfiguredInstances(
>                 ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
>                 ProducerInterceptor.class,
>                 Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, 
> clientId));
>         if (interceptors != null)
>             this.interceptors = interceptors;
>         else
>             this.interceptors = new ProducerInterceptors<>(interceptorList);
>         this.interceptors.maybeOpen();  
> ...{code}

