[ 
https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Beard updated KAFKA-14565:
--------------------------------
    Description: 
h2. PROBLEM

The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources.

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
AbstractConfig.getConfiguredInstances()  is delagated responsibilty for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  List<ConsumerInterceptor<K,V>> interceptors.
h2. Kafka Consumer Constructor

 
{code:java}
try {
....
List<ConsumerInterceptor<K, V>> interceptorList = (List) 
config.getConfiguredInstances(
        ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ConsumerInterceptor.class,
        Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
 {code}
 

 
h2. Kafka Producer Constructor
{code:java}
try {
....
List<ProducerInterceptor<K, V>> interceptorList = (List) 
config.getConfiguredInstances(
        ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ProducerInterceptor.class,
        Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
 {code}
This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a runtime exception.  This 
raising of the runtime exception results produces a resource leakage in the 
first interceptor as the interceptor container i.e. 
ConsumerInterceptors/ProducerInterceptors are never created and therefore the 
first interceptor's and really any interceptor's close method are never called. 
 
h2. KafkaConsumer Constructor
{code:java}
try {
....
List<ConsumerInterceptor<K, V>> interceptorList = (List) 
config.getConfiguredInstances(
        ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ConsumerInterceptor.class,
        Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
.... {code}
If the above line results in a runtime exception, the below this.interceptors 
is never created. 
{code:java}
this.interceptors = new ConsumerInterceptors<>(interceptorList); {code}
h2. Kafka Producer{color:#172b4d} Constructor{color}
{code:java}
try {
....
List<ProducerInterceptor<K, V>> interceptorList = (List) 
config.getConfiguredInstances(
        ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ProducerInterceptor.class,
        Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)); 
{code}
If the above line results in a runtime exception, the below this.interceptors 
is never created. 
{code:java}
if (interceptors != null)
    this.interceptors = interceptors;
else
    this.interceptors = new ProducerInterceptors<>(interceptorList);
.... {code}
 

Although, both Kafka Consumer and Kafka Producer constructors try/catch 
implement  close for resource clean up, 
{code:java}
...
catch (Throwable t) {
    // call close methods if internal objects are already constructed; this is 
to prevent resource leak. see KAFKA-2121
    // we do not need to call `close` at all when `log` is null, which means no 
internal objects were initialized.
    if (this.log != null) {
        close(0, true);
    }
    // now propagate the exception
    throw new KafkaException("Failed to construct kafka consumer", t);
} {code}
their respective close implementation located in the catch above never calls 
the respective container interceptor close method below as the 
{color:#172b4d}*this{color}.{color:#403294}interceptors{color}* was never 
created.
{code:java}
private void close(long timeoutMs, boolean swallowException) {
 ....  
Utils.closeQuietly(interceptors, "consumer interceptors", firstException);
  .... {code}
This problem is magnified within a webserver cluster i.e. Confluent's REST 
Proxy server where thousands of requests containing interceptor configuration 
failures can occur in seconds resulting in an inadvertent DDoS attack as 
cluster resources are quickly exhausted, disrupting all service activities.   
h2. PROPOSAL

To help ensure the respective container interceptors are able to invoke their 
respective interceptor close methods for proper resource clean up, I propose 
defining a default open method with no implementation and a check exception on 
the respective Consumer/Producer interceptor interfaces.  This open method will 
be responsible for creating threads and/or objects which utilizes threads, 
connections or other resource which requires clean up.  Additionally, the 
default open method enables implementation optionality as it's empty default 
behavior means it will do nothing on unimplemented classes of this interceptor 
interface.  

 
{code:java}
package org.apache.kafka.clients.consumer;

import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.TopicPartition;
import java.util.Map;


public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable 
{    
    ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);    

    void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);    

    default void open() throws Exception {};
   
    void close();
}
 {code}
 

 
{code:java}
package org.apache.kafka.clients.producer;

import org.apache.kafka.common.Configurable;

public interface ProducerInterceptor<K, V> extends Configurable, AutoCloseable {
   
    ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);    
    void onAcknowledgement(RecordMetadata metadata, Exception exception);
    
    default void open() throws Exception {};
    
    void close();
}
 {code}
 

 

{color:#172b4d}Additionally, the Kafka Consumer/Producer Interceptor containers 
will implement a corresponding maybeOpen method which throws a checked 
Exception.  It's called maybeOpen for backwards compatibility purpose as it 
must  determine whether an interceptor's interface contains the newer open 
method before calling it accordingly.   {color}

{color:#172b4d}{*}NOTE{*}: Developers are encouraged to throw a more specific 
exception.{color}

 
{code:java}
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.Closeable;
import java.util.List;
import java.util.Arrays;
import java.util.Map;/**

public class ConsumerInterceptors<K, V> implements Closeable {
    private static final Logger log = 
LoggerFactory.getLogger(ConsumerInterceptors.class);
    private final List<ConsumerInterceptor<K, V>> interceptors;    

public ConsumerInterceptors(List<ConsumerInterceptor<K, V>> interceptors) {
        this.interceptors = interceptors;
    }    

    public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
        ConsumerRecords<K, V> interceptRecords = records;
        for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
            try {
                interceptRecords = interceptor.onConsume(interceptRecords);
            } catch (Exception e) {
                // do not propagate interceptor exception, log and continue 
calling other interceptors
                log.warn("Error executing interceptor onConsume callback", e);
            }
        }
        return interceptRecords;
    }    

    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
            try {
                interceptor.onCommit(offsets);
            } catch (Exception e) {
                // do not propagate interceptor exception, just log
                log.warn("Error executing interceptor onCommit callback", e);
            }
        }
    }    

    @Override
    public void close() {
        for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
            try {
                interceptor.close();
            } catch (Exception e) {
                log.error("Failed to close consumer interceptor ", e);
            }
        }
    }    public List<ConsumerInterceptor<K, V>> getInterceptors() {
        return interceptors;
    }    
     /**
     * Only interceptors which implement {@link ConsumerInterceptor#open()} are 
  called by the container.  This is for backwards
     * compatibility as older interceptors do not contain the default open()
     * */
    public void maybeOpen() throws Exception {
        for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
            try {
                
if(Arrays.stream(interceptor.getClass().getMethods()).anyMatch(method -> 
method.getName() == "open")){
                    interceptor.open();
                }
             } catch (Exception e) {
                log.error("Failed to open consumer interceptor ", e);
                throw e;
            }
        }
    }
} {code}
 

 

 
{code:java}
package org.apache.kafka.clients.producer.internals;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.RecordBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.Closeable;
import java.util.Arrays;
import java.util.List;/**

public class ProducerInterceptors<K, V> implements Closeable {
    private static final Logger log = 
LoggerFactory.getLogger(ProducerInterceptors.class);
    private final List<ProducerInterceptor<K, V>> interceptors;    

public ProducerInterceptors(List<ProducerInterceptor<K, V>> interceptors) {
        this.interceptors = interceptors;
    }
   
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
        ProducerRecord<K, V> interceptRecord = record;
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                interceptRecord = interceptor.onSend(interceptRecord);
            } catch (Exception e) {
                // do not propagate interceptor exception, log and continue 
calling other interceptors
                // be careful not to throw exception from here
                if (record != null)
                    log.warn("Error executing interceptor onSend callback for 
topic: {}, partition: {}", record.topic(), record.partition(), e);
                else
                    log.warn("Error executing interceptor onSend callback", e);
            }
        }
        return interceptRecord;
    }    

public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                interceptor.onAcknowledgement(metadata, exception);
            } catch (Exception e) {
                // do not propagate interceptor exceptions, just log
                log.warn("Error executing interceptor onAcknowledgement 
callback", e);
            }
        }
    }    

 public void onSendError(ProducerRecord<K, V> record, TopicPartition 
interceptTopicPartition, Exception exception) {
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                if (record == null && interceptTopicPartition == null) {
                    interceptor.onAcknowledgement(null, exception);
                } else {
                    if (interceptTopicPartition == null) {
                        interceptTopicPartition = extractTopicPartition(record);
                    }
                    interceptor.onAcknowledgement(new 
RecordMetadata(interceptTopicPartition, -1, -1,
                                    RecordBatch.NO_TIMESTAMP, -1, -1), 
exception);
                }
            } catch (Exception e) {
                // do not propagate interceptor exceptions, just log
                log.warn("Error executing interceptor onAcknowledgement 
callback", e);
            }
        }
    }    
  public static <K, V> TopicPartition extractTopicPartition(ProducerRecord<K, 
V> record) {
        return new TopicPartition(record.topic(), record.partition() == null ? 
RecordMetadata.UNKNOWN_PARTITION : record.partition());
    }
    
    @Override
    public void close() {
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                interceptor.close();
            } catch (Exception e) {
                log.error("Failed to close producer interceptor ", e);
            }
        }
    }    

    /**
     * Only interceptors which implement {@link ProducerInterceptor#open()} are 
called by the container.  This is for backwards
     * compatibility as older interceptors do not contain the default open()
     * */
    public void maybeOpen() throws Exception {
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                
if(Arrays.stream(interceptor.getClass().getMethods()).anyMatch(method -> 
method.getName() == "open")){
                    interceptor.open();
                }
            } catch (Exception e) {
                log.error("Failed to open producer interceptor ", e);
                throw e;
            }
        }
    }
}
 {code}
In summary, the overall workflow is that after the configured interceptor 
instances are returned by the AbstractConfig.getConfiguredInstances(),  the 
Kafka Consumer/Producer constructor's respective interceptor container 
maybeOpen method will be called.  
If in the maybeOpen call, an exception occurs following the interceptor open 
method call, the respective client constructor's try/catch will call the 
interceptor container's close method which in-turn loops through and calls each 
interceptor's close method for clean up of resources allocated in the 
interceptor open method.

 
If an exception occurs in the configure method all objects will be garbage 
collected as this method must no longer be used for creating threads and/or 
objects which utilizes threads, connections or other resources which requires 
clean up.  
h2. Kafka Consumer Constructor maybeOpen example

 
{code:java}
...
        List<ConsumerInterceptor<K, V>> interceptorList = (List) 
config.getConfiguredInstances(
                ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
                ConsumerInterceptor.class,
                Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, 
clientId));        this.interceptors = new 
ConsumerInterceptors<>(interceptorList);
        this.interceptors.maybeOpen();
...{code}
h2. Kafka Producer {color:#172b4d}maybeOpen{color} example
{code:java}
...        
List<ProducerInterceptor<K, V>> interceptorList = (List) 
config.getConfiguredInstances(
                ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                ProducerInterceptor.class,
                Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, 
clientId));
        if (interceptors != null)
            this.interceptors = interceptors;
        else
            this.interceptors = new ProducerInterceptors<>(interceptorList);
        this.interceptors.maybeOpen();  
...{code}
 

 

 

 

 

  was:
h2. PROBLEM

The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. [More 
colors|https://issues.apache.org/jira/secure/CreateIssue.jspa#]

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
AbstractConfig.getConfiguredInstances()  is delagated responsibilty for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  List<ConsumerInterceptor<K,V>> interceptors.
h2. Kafka Consumer Constructor

 
{code:java}
try {
....
List<ConsumerInterceptor<K, V>> interceptorList = (List) 
config.getConfiguredInstances(
        ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ConsumerInterceptor.class,
        Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
 {code}
 

 
h2. Kafka Producer Constructor
{code:java}
try {
....
List<ProducerInterceptor<K, V>> interceptorList = (List) 
config.getConfiguredInstances(
        ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ProducerInterceptor.class,
        Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
 {code}
This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a runtime exception.  This 
raising of the runtime exception results produces a resource leakage in the 
first interceptor as the interceptor container i.e. 
ConsumerInterceptors/ProducerInterceptors are never created and therefore the 
first interceptor's and really any interceptor's close method are never called. 
 
h2. KafkaConsumer Constructor
{code:java}
try {
....
List<ConsumerInterceptor<K, V>> interceptorList = (List) 
config.getConfiguredInstances(
        ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ConsumerInterceptor.class,
        Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
.... {code}
If the above line results in a runtime exception, the below this.interceptors 
is never created. 
{code:java}
this.interceptors = new ConsumerInterceptors<>(interceptorList); {code}
h2. Kafka Producer{color:#172b4d} Constructor{color}
{code:java}
try {
....
List<ProducerInterceptor<K, V>> interceptorList = (List) 
config.getConfiguredInstances(
        ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ProducerInterceptor.class,
        Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)); 
{code}
If the above line results in a runtime exception, the below this.interceptors 
is never created. 
{code:java}
if (interceptors != null)
    this.interceptors = interceptors;
else
    this.interceptors = new ProducerInterceptors<>(interceptorList);
.... {code}
 

Although, both Kafka Consumer and Kafka Producer constructors try/catch 
implement  close for resource clean up, 
{code:java}
...
catch (Throwable t) {
    // call close methods if internal objects are already constructed; this is 
to prevent resource leak. see KAFKA-2121
    // we do not need to call `close` at all when `log` is null, which means no 
internal objects were initialized.
    if (this.log != null) {
        close(0, true);
    }
    // now propagate the exception
    throw new KafkaException("Failed to construct kafka consumer", t);
} {code}
their respective close implementation located in the catch above never calls 
the respective container interceptor close method below as the 
{color:#172b4d}*{color:#ffab00}this{color}.{color:#403294}interceptors{color}*{color}{color:#403294}
 {color}was never created.
{code:java}
private void close(long timeoutMs, boolean swallowException) {
 ....  
Utils.closeQuietly(interceptors, "consumer interceptors", firstException);
  .... {code}
This problem is magnified within a webserver cluster i.e. Confluent's REST 
Proxy server where thousands of requests containing interceptor configuration 
failures can occur in seconds resulting in an inadvertent DDoS attack as 
cluster resources are quickly exhausted, disrupting all service activities.   
h2. PROPOSAL

To help ensure the respective container interceptors are able to invoke their 
respective interceptor close methods for proper resource clean up, I propose 
defining a default open method with no implementation and a check exception on 
the respective Consumer/Producer interceptor interfaces.  This open method will 
be responsible for creating threads and/or objects which utilizes threads, 
connections or other resource which requires clean up.  Additionally, the 
default open method enables implementation optionality as it's empty default 
behavior means it will do nothing on unimplemented classes of this interceptor 
interface.  

 
{code:java}
package org.apache.kafka.clients.consumer;

import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.TopicPartition;
import java.util.Map;


public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable 
{    
    ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);    

    void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);    

    default void open() throws Exception {};
   
    void close();
}
 {code}
 

 
{code:java}
package org.apache.kafka.clients.producer;

import org.apache.kafka.common.Configurable;

public interface ProducerInterceptor<K, V> extends Configurable, AutoCloseable {
   
    ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);    
    void onAcknowledgement(RecordMetadata metadata, Exception exception);
    
    default void open() throws Exception {};
    
    void close();
}
 {code}
 

 

{color:#172b4d}Additionally, the Kafka Consumer/Producer Interceptor containers 
will implement a corresponding maybeOpen method which throws a checked 
Exception.  It's called maybeOpen for backwards compatibility purpose as it 
must  determine whether an interceptor's interface contains the newer open 
method before calling it accordingly.   {color}

{color:#172b4d}{*}NOTE{*}: Developers are encouraged to throw a more specific 
exception.{color}

 
{code:java}
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.Closeable;
import java.util.List;
import java.util.Arrays;
import java.util.Map;/**

public class ConsumerInterceptors<K, V> implements Closeable {
    private static final Logger log = 
LoggerFactory.getLogger(ConsumerInterceptors.class);
    private final List<ConsumerInterceptor<K, V>> interceptors;    

public ConsumerInterceptors(List<ConsumerInterceptor<K, V>> interceptors) {
        this.interceptors = interceptors;
    }    

    public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
        ConsumerRecords<K, V> interceptRecords = records;
        for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
            try {
                interceptRecords = interceptor.onConsume(interceptRecords);
            } catch (Exception e) {
                // do not propagate interceptor exception, log and continue 
calling other interceptors
                log.warn("Error executing interceptor onConsume callback", e);
            }
        }
        return interceptRecords;
    }    

    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
            try {
                interceptor.onCommit(offsets);
            } catch (Exception e) {
                // do not propagate interceptor exception, just log
                log.warn("Error executing interceptor onCommit callback", e);
            }
        }
    }    

    @Override
    public void close() {
        for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
            try {
                interceptor.close();
            } catch (Exception e) {
                log.error("Failed to close consumer interceptor ", e);
            }
        }
    }    public List<ConsumerInterceptor<K, V>> getInterceptors() {
        return interceptors;
    }    
     /**
     * Only interceptors which implement {@link ConsumerInterceptor#open()} are 
  called by the container.  This is for backwards
     * compatibility as older interceptors do not contain the default open()
     * */
    public void maybeOpen() throws Exception {
        for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
            try {
                
if(Arrays.stream(interceptor.getClass().getMethods()).anyMatch(method -> 
method.getName() == "open")){
                    interceptor.open();
                }
             } catch (Exception e) {
                log.error("Failed to open consumer interceptor ", e);
                throw e;
            }
        }
    }
} {code}
 

 

 
{code:java}
package org.apache.kafka.clients.producer.internals;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.RecordBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.Closeable;
import java.util.Arrays;
import java.util.List;/**

public class ProducerInterceptors<K, V> implements Closeable {
    private static final Logger log = 
LoggerFactory.getLogger(ProducerInterceptors.class);
    private final List<ProducerInterceptor<K, V>> interceptors;    

public ProducerInterceptors(List<ProducerInterceptor<K, V>> interceptors) {
        this.interceptors = interceptors;
    }
   
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
        ProducerRecord<K, V> interceptRecord = record;
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                interceptRecord = interceptor.onSend(interceptRecord);
            } catch (Exception e) {
                // do not propagate interceptor exception, log and continue 
calling other interceptors
                // be careful not to throw exception from here
                if (record != null)
                    log.warn("Error executing interceptor onSend callback for 
topic: {}, partition: {}", record.topic(), record.partition(), e);
                else
                    log.warn("Error executing interceptor onSend callback", e);
            }
        }
        return interceptRecord;
    }    

public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                interceptor.onAcknowledgement(metadata, exception);
            } catch (Exception e) {
                // do not propagate interceptor exceptions, just log
                log.warn("Error executing interceptor onAcknowledgement 
callback", e);
            }
        }
    }    

 public void onSendError(ProducerRecord<K, V> record, TopicPartition 
interceptTopicPartition, Exception exception) {
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                if (record == null && interceptTopicPartition == null) {
                    interceptor.onAcknowledgement(null, exception);
                } else {
                    if (interceptTopicPartition == null) {
                        interceptTopicPartition = extractTopicPartition(record);
                    }
                    interceptor.onAcknowledgement(new 
RecordMetadata(interceptTopicPartition, -1, -1,
                                    RecordBatch.NO_TIMESTAMP, -1, -1), 
exception);
                }
            } catch (Exception e) {
                // do not propagate interceptor exceptions, just log
                log.warn("Error executing interceptor onAcknowledgement 
callback", e);
            }
        }
    }    
  public static <K, V> TopicPartition extractTopicPartition(ProducerRecord<K, 
V> record) {
        return new TopicPartition(record.topic(), record.partition() == null ? 
RecordMetadata.UNKNOWN_PARTITION : record.partition());
    }
    
    @Override
    public void close() {
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                interceptor.close();
            } catch (Exception e) {
                log.error("Failed to close producer interceptor ", e);
            }
        }
    }    

    /**
     * Only interceptors which implement {@link ProducerInterceptor#open()} are 
called by the container.  This is for backwards
     * compatibility as older interceptors do not contain the default open()
     * */
    public void maybeOpen() throws Exception {
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                
if(Arrays.stream(interceptor.getClass().getMethods()).anyMatch(method -> 
method.getName() == "open")){
                    interceptor.open();
                }
            } catch (Exception e) {
                log.error("Failed to open producer interceptor ", e);
                throw e;
            }
        }
    }
}
 {code}
In summary, the overall workflow is that after the configured interceptor 
instances are returned by the AbstractConfig.getConfiguredInstances(),  the 
Kafka Consumer/Producer constructor's respective interceptor container 
maybeOpen method will be called.  
If in the maybeOpen call, an exception occurs following the interceptor open 
method call, the respective client constructor's try/catch will call the 
interceptor container's close method which in-turn loops through and calls each 
interceptor's close method for clean up of resources allocated in the 
interceptor open method.

 
If an exception occurs in the configure method all objects will be garbage 
collected as this method must no longer be used for creating threads and/or 
objects which utilizes threads, connections or other resources which requires 
clean up.  
h2. Kafka Consumer Constructor maybeOpen example

 
{code:java}
...
        List<ConsumerInterceptor<K, V>> interceptorList = (List) 
config.getConfiguredInstances(
                ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
                ConsumerInterceptor.class,
                Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, 
clientId));        this.interceptors = new 
ConsumerInterceptors<>(interceptorList);
        this.interceptors.maybeOpen();
...{code}
h2. Kafka Producer {color:#172b4d}maybeOpen{color} example
{code:java}
...        
List<ProducerInterceptor<K, V>> interceptorList = (List) 
config.getConfiguredInstances(
                ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                ProducerInterceptor.class,
                Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, 
clientId));
        if (interceptors != null)
            this.interceptors = interceptors;
        else
            this.interceptors = new ProducerInterceptors<>(interceptorList);
        this.interceptors.maybeOpen();  
...{code}
 

 

 

 

 


> Add A No Implementation Default Open Method To Consumer and Producer 
> Interceptor Interfaces
> -------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-14565
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14565
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>            Reporter: Terry Beard
>            Assignee: Terry Beard
>            Priority: Major
>
> h2. PROBLEM
> The Consumer and Producer interceptor interfaces and their corresponding 
> Kafka Consumer and Producer constructors do not adequately support cleanup of 
> underlying interceptor resources.
> Currently within the Kafka Consumer and Kafka Producer constructors,  the 
> AbstractConfig.getConfiguredInstances()  is delagated responsibilty for both 
> creating and configuring each interceptor listed in the interceptor.classes 
> property and returns a configured  List<ConsumerInterceptor<K,V>> 
> interceptors.
> h2. Kafka Consumer Constructor
>  
> {code:java}
> try {
> ....
> List<ConsumerInterceptor<K, V>> interceptorList = (List) 
> config.getConfiguredInstances(
>         ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
>         ConsumerInterceptor.class,
>         Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
>  {code}
>  
>  
> h2. Kafka Producer Constructor
> {code:java}
> try {
> ....
> List<ProducerInterceptor<K, V>> interceptorList = (List) 
> config.getConfiguredInstances(
>         ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
>         ProducerInterceptor.class,
>         Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
>  {code}
> This dual responsibility for both creation and configuration is problematic 
> when it involves multiple interceptors where at least one interceptor's 
> configure method implementation creates and/or depends on objects which 
> creates threads, connections or other resources which requires clean up and 
> the subsequent interceptor's configure method raises a runtime exception.  
> This raising of the runtime exception results produces a resource leakage in 
> the first interceptor as the interceptor container i.e. 
> ConsumerInterceptors/ProducerInterceptors are never created and therefore the 
> first interceptor's and really any interceptor's close method are never 
> called.  
> h2. KafkaConsumer Constructor
> {code:java}
> try {
> ....
> List<ConsumerInterceptor<K, V>> interceptorList = (List) 
> config.getConfiguredInstances(
>         ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
>         ConsumerInterceptor.class,
>         Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
> .... {code}
> If the above line results in a runtime exception, the below this.interceptors 
> is never created. 
> {code:java}
> this.interceptors = new ConsumerInterceptors<>(interceptorList); {code}
> h2. Kafka Producer{color:#172b4d} Constructor{color}
> {code:java}
> try {
> ....
> List<ProducerInterceptor<K, V>> interceptorList = (List) 
> config.getConfiguredInstances(
>         ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
>         ProducerInterceptor.class,
>         Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)); 
> {code}
> If the above line results in a runtime exception, the below this.interceptors 
> is never created. 
> {code:java}
> if (interceptors != null)
>     this.interceptors = interceptors;
> else
>     this.interceptors = new ProducerInterceptors<>(interceptorList);
> .... {code}
>  
> Although, both Kafka Consumer and Kafka Producer constructors try/catch 
> implement  close for resource clean up, 
> {code:java}
> ...
> catch (Throwable t) {
>     // call close methods if internal objects are already constructed; this 
> is to prevent resource leak. see KAFKA-2121
>     // we do not need to call `close` at all when `log` is null, which means 
> no internal objects were initialized.
>     if (this.log != null) {
>         close(0, true);
>     }
>     // now propagate the exception
>     throw new KafkaException("Failed to construct kafka consumer", t);
> } {code}
> their respective close implementation located in the catch above never calls 
> the respective container interceptor close method below as the 
> {color:#172b4d}*this{color}.{color:#403294}interceptors{color}* was never 
> created.
> {code:java}
> private void close(long timeoutMs, boolean swallowException) {
>  ....  
> Utils.closeQuietly(interceptors, "consumer interceptors", firstException);
>   .... {code}
> This problem is magnified within a webserver cluster i.e. Confluent's REST 
> Proxy server where thousands of requests containing interceptor configuration 
> failures can occur in seconds resulting in an inadvertent DDoS attack as 
> cluster resources are quickly exhausted, disrupting all service activities.   
> h2. PROPOSAL
> To help ensure the respective container interceptors are able to invoke their 
> respective interceptor close methods for proper resource clean up, I propose 
> defining a default open method with no implementation and a check exception 
> on the respective Consumer/Producer interceptor interfaces.  This open method 
> will be responsible for creating threads and/or objects which utilizes 
> threads, connections or other resource which requires clean up.  
> Additionally, the default open method enables implementation optionality as 
> it's empty default behavior means it will do nothing on unimplemented classes 
> of this interceptor interface.  
>  
> {code:java}
> package org.apache.kafka.clients.consumer;
> import org.apache.kafka.common.Configurable;
> import org.apache.kafka.common.TopicPartition;
> import java.util.Map;
> public interface ConsumerInterceptor<K, V> extends Configurable, 
> AutoCloseable {    
>     ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);    
>     void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);    
>     default void open() throws Exception {};
>    
>     void close();
> }
>  {code}
>  
>  
> {code:java}
> package org.apache.kafka.clients.producer;
> import org.apache.kafka.common.Configurable;
> public interface ProducerInterceptor<K, V> extends Configurable, 
> AutoCloseable {
>    
>     ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);    
>     void onAcknowledgement(RecordMetadata metadata, Exception exception);
>     
>     default void open() throws Exception {};
>     
>     void close();
> }
>  {code}
>  
>  
> {color:#172b4d}Additionally, the Kafka Consumer/Producer Interceptor 
> containers will implement a corresponding maybeOpen method which throws a 
> checked Exception.  It's called maybeOpen for backwards compatibility purpose 
> as it must  determine whether an interceptor's interface contains the newer 
> open method before calling it accordingly.   {color}
> {color:#172b4d}{*}NOTE{*}: Developers are encouraged to throw a more specific 
> exception.{color}
>  
> {code:java}
> package org.apache.kafka.clients.consumer.internals;
> import org.apache.kafka.clients.consumer.ConsumerInterceptor;
> import org.apache.kafka.clients.consumer.ConsumerRecords;
> import org.apache.kafka.clients.consumer.OffsetAndMetadata;
> import org.apache.kafka.clients.producer.ProducerInterceptor;
> import org.apache.kafka.common.TopicPartition;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;import java.io.Closeable;
> import java.util.List;
> import java.util.Arrays;
> import java.util.Map;/**
> public class ConsumerInterceptors<K, V> implements Closeable {
>     private static final Logger log = 
> LoggerFactory.getLogger(ConsumerInterceptors.class);
>     private final List<ConsumerInterceptor<K, V>> interceptors;    
> public ConsumerInterceptors(List<ConsumerInterceptor<K, V>> interceptors) {
>         this.interceptors = interceptors;
>     }    
>     public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
>         ConsumerRecords<K, V> interceptRecords = records;
>         for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
>             try {
>                 interceptRecords = interceptor.onConsume(interceptRecords);
>             } catch (Exception e) {
>                 // do not propagate interceptor exception, log and continue 
> calling other interceptors
>                 log.warn("Error executing interceptor onConsume callback", e);
>             }
>         }
>         return interceptRecords;
>     }    
>     public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
>         for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
>             try {
>                 interceptor.onCommit(offsets);
>             } catch (Exception e) {
>                 // do not propagate interceptor exception, just log
>                 log.warn("Error executing interceptor onCommit callback", e);
>             }
>         }
>     }    
>     @Override
>     public void close() {
>         for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
>             try {
>                 interceptor.close();
>             } catch (Exception e) {
>                 log.error("Failed to close consumer interceptor ", e);
>             }
>         }
>     }    public List<ConsumerInterceptor<K, V>> getInterceptors() {
>         return interceptors;
>     }    
>      /**
>      * Only interceptors which implement {@link ConsumerInterceptor#open()} 
> are   called by the container.  This is for backwards
>      * compatibility as older interceptors do not contain the default open()
>      * */
>     public void maybeOpen() throws Exception {
>         for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
>             try {
>                 
> if(Arrays.stream(interceptor.getClass().getMethods()).anyMatch(method -> 
> method.getName() == "open")){
>                     interceptor.open();
>                 }
>              } catch (Exception e) {
>                 log.error("Failed to open consumer interceptor ", e);
>                 throw e;
>             }
>         }
>     }
> } {code}
>  
>  
>  
> {code:java}
> package org.apache.kafka.clients.producer.internals;
> import org.apache.kafka.clients.producer.ProducerInterceptor;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.clients.producer.RecordMetadata;
> import org.apache.kafka.common.TopicPartition;
> import org.apache.kafka.common.record.RecordBatch;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;import java.io.Closeable;
> import java.util.Arrays;
> import java.util.List;/**
> public class ProducerInterceptors<K, V> implements Closeable {
>     private static final Logger log = 
> LoggerFactory.getLogger(ProducerInterceptors.class);
>     private final List<ProducerInterceptor<K, V>> interceptors;    
> public ProducerInterceptors(List<ProducerInterceptor<K, V>> interceptors) {
>         this.interceptors = interceptors;
>     }
>    
> public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
>         ProducerRecord<K, V> interceptRecord = record;
>         for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
>             try {
>                 interceptRecord = interceptor.onSend(interceptRecord);
>             } catch (Exception e) {
>                 // do not propagate interceptor exception, log and continue 
> calling other interceptors
>                 // be careful not to throw exception from here
>                 if (record != null)
>                     log.warn("Error executing interceptor onSend callback for 
> topic: {}, partition: {}", record.topic(), record.partition(), e);
>                 else
>                     log.warn("Error executing interceptor onSend callback", 
> e);
>             }
>         }
>         return interceptRecord;
>     }    
> public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
>         for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
>             try {
>                 interceptor.onAcknowledgement(metadata, exception);
>             } catch (Exception e) {
>                 // do not propagate interceptor exceptions, just log
>                 log.warn("Error executing interceptor onAcknowledgement 
> callback", e);
>             }
>         }
>     }    
>  public void onSendError(ProducerRecord<K, V> record, TopicPartition 
> interceptTopicPartition, Exception exception) {
>         for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
>             try {
>                 if (record == null && interceptTopicPartition == null) {
>                     interceptor.onAcknowledgement(null, exception);
>                 } else {
>                     if (interceptTopicPartition == null) {
>                         interceptTopicPartition = 
> extractTopicPartition(record);
>                     }
>                     interceptor.onAcknowledgement(new 
> RecordMetadata(interceptTopicPartition, -1, -1,
>                                     RecordBatch.NO_TIMESTAMP, -1, -1), 
> exception);
>                 }
>             } catch (Exception e) {
>                 // do not propagate interceptor exceptions, just log
>                 log.warn("Error executing interceptor onAcknowledgement 
> callback", e);
>             }
>         }
>     }    
>   public static <K, V> TopicPartition extractTopicPartition(ProducerRecord<K, 
> V> record) {
>         return new TopicPartition(record.topic(), record.partition() == null 
> ? RecordMetadata.UNKNOWN_PARTITION : record.partition());
>     }
>     
>     @Override
>     public void close() {
>         for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
>             try {
>                 interceptor.close();
>             } catch (Exception e) {
>                 log.error("Failed to close producer interceptor ", e);
>             }
>         }
>     }    
>     /**
>      * Only interceptors which implement {@link ProducerInterceptor#open()} 
> are called by the container.  This is for backwards
>      * compatibility as older interceptors do not contain the default open()
>      * */
>     public void maybeOpen() throws Exception {
>         for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
>             try {
>                 
> if(Arrays.stream(interceptor.getClass().getMethods()).anyMatch(method -> 
> method.getName() == "open")){
>                     interceptor.open();
>                 }
>             } catch (Exception e) {
>                 log.error("Failed to open producer interceptor ", e);
>                 throw e;
>             }
>         }
>     }
> }
>  {code}
> In summary, the overall workflow is that after the configured interceptor 
> instances are returned by the AbstractConfig.getConfiguredInstances(),  the 
> Kafka Consumer/Producer constructor's respective interceptor container 
> maybeOpen method will be called.  
> If in the maybeOpen call, an exception occurs following the interceptor open 
> method call, the respective client constructor's try/catch will call the 
> interceptor container's close method which in-turn loops through and calls 
> each interceptor's close method for clean up of resources allocated in the 
> interceptor open method.
>  
> If an exception occurs in the configure method all objects will be garbage 
> collected as this method must no longer be used for creating threads and/or 
> objects which utilizes threads, connections or other resources which requires 
> clean up.  
> h2. Kafka Consumer Constructor maybeOpen example
>  
> {code:java}
> ...
>         List<ConsumerInterceptor<K, V>> interceptorList = (List) 
> config.getConfiguredInstances(
>                 ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
>                 ConsumerInterceptor.class,
>                 Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, 
> clientId));        this.interceptors = new 
> ConsumerInterceptors<>(interceptorList);
>         this.interceptors.maybeOpen();
> ...{code}
> h2. Kafka Producer {color:#172b4d}maybeOpen{color} example
> {code:java}
> ...        
> List<ProducerInterceptor<K, V>> interceptorList = (List) 
> config.getConfiguredInstances(
>                 ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
>                 ProducerInterceptor.class,
>                 Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, 
> clientId));
>         if (interceptors != null)
>             this.interceptors = interceptors;
>         else
>             this.interceptors = new ProducerInterceptors<>(interceptorList);
>         this.interceptors.maybeOpen();  
> ...{code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to