Terry Beard created KAFKA-14565:
-----------------------------------

             Summary: Add A No Implementation Default Open Method To Consumer 
and Producer Interceptor Interfaces
                 Key: KAFKA-14565
                 URL: https://issues.apache.org/jira/browse/KAFKA-14565
             Project: Kafka
          Issue Type: Improvement
          Components: clients
            Reporter: Terry Beard
            Assignee: Terry Beard


h2. PROBLEM

The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. [More 
colors|https://issues.apache.org/jira/secure/CreateIssue.jspa#]

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
AbstractConfig.getConfiguredInstances()  is delagated responsibilty for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  List<ConsumerInterceptor<K,V>> interceptors.
h2. Kafka Consumer Constructor

 
{code:java}
try {
....
List<ConsumerInterceptor<K, V>> interceptorList = (List) 
config.getConfiguredInstances(
        ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ConsumerInterceptor.class,
        Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
 {code}
 

 
h2. Kafka Producer Constructor
{code:java}
try {
....
List<ProducerInterceptor<K, V>> interceptorList = (List) 
config.getConfiguredInstances(
        ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ProducerInterceptor.class,
        Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
 {code}
This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a runtime exception.  This 
raising of the runtime exception results produces a resource leakage in the 
first interceptor as the interceptor container i.e. 
ConsumerInterceptors/ProducerInterceptors are never created and therefore the 
first interceptor's and really any interceptor's close method are never called. 
 
h2. KafkaConsumer Constructor
{code:java}
try {
....
List<ConsumerInterceptor<K, V>> interceptorList = (List) 
config.getConfiguredInstances(
        ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ConsumerInterceptor.class,
        Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
.... {code}
If the above line results in a runtime exception, the below this.interceptors 
is never created. 
{code:java}
this.interceptors = new ConsumerInterceptors<>(interceptorList); {code}
h2. Kafka Producer{color:#172b4d} Constructor{color}
{code:java}
try {
....
List<ProducerInterceptor<K, V>> interceptorList = (List) 
config.getConfiguredInstances(
        ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ProducerInterceptor.class,
        Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)); 
{code}
If the above line results in a runtime exception, the below this.interceptors 
is never created. 
{code:java}
if (interceptors != null)
    this.interceptors = interceptors;
else
    this.interceptors = new ProducerInterceptors<>(interceptorList);
.... {code}
 

Although, both Kafka Consumer and Kafka Producer constructors try/catch 
implement  close for resource clean up, 
{code:java}
...
catch (Throwable t) {
    // call close methods if internal objects are already constructed; this is 
to prevent resource leak. see KAFKA-2121
    // we do not need to call `close` at all when `log` is null, which means no 
internal objects were initialized.
    if (this.log != null) {
        close(0, true);
    }
    // now propagate the exception
    throw new KafkaException("Failed to construct kafka consumer", t);
} {code}
their respective close implementation located in the catch above never calls 
the respective container interceptor close method below as the 
{color:#172b4d}*{color:#ffab00}this{color}.{color:#403294}interceptors{color}*{color}{color:#403294}
 {color}was never created.
{code:java}
private void close(long timeoutMs, boolean swallowException) {
 ....  
Utils.closeQuietly(interceptors, "consumer interceptors", firstException);
  .... {code}
This problem is magnified within a webserver cluster i.e. Confluent's REST 
Proxy server where thousands of requests containing interceptor configuration 
failures can occur in seconds resulting in an inadvertent DDoS attack as 
cluster resources are quickly exhausted, disrupting all service activities.   
h2. PROPOSAL

To help ensure the respective container interceptors are able to invoke their 
respective interceptor close methods for proper resource clean up, I propose 
defining a default open method with no implementation and a check exception on 
the respective Consumer/Producer interceptor interfaces.  This open method will 
be responsible for creating threads and/or objects which utilizes threads, 
connections or other resource which requires clean up.  Additionally, the 
default open method enables implementation optionality as it's empty default 
behavior means it will do nothing on unimplemented classes of this interceptor 
interface.  

 
{code:java}
package org.apache.kafka.clients.consumer;

import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.TopicPartition;
import java.util.Map;


public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable 
{    
    ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);    

    void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);    

    default void open() throws Exception {};
   
    void close();
}
 {code}
 

 
{code:java}
package org.apache.kafka.clients.producer;

import org.apache.kafka.common.Configurable;

public interface ProducerInterceptor<K, V> extends Configurable, AutoCloseable {
   
    ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);    
    void onAcknowledgement(RecordMetadata metadata, Exception exception);
    
    default void open() throws Exception {};
    
    void close();
}
 {code}
 

 

{color:#172b4d}Additionally, the Kafka Consumer/Producer Interceptor containers 
will implement a corresponding maybeOpen method which throws a checked 
Exception.  It's called maybeOpen for backwards compatibility purpose as it 
must  determine whether an interceptor's interface contains the newer open 
method before calling it accordingly.   {color}

{color:#172b4d}{*}NOTE{*}: Developers are encouraged to throw a more specific 
exception.{color}

 
{code:java}
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.Closeable;
import java.util.List;
import java.util.Arrays;
import java.util.Map;/**

public class ConsumerInterceptors<K, V> implements Closeable {
    private static final Logger log = 
LoggerFactory.getLogger(ConsumerInterceptors.class);
    private final List<ConsumerInterceptor<K, V>> interceptors;    

public ConsumerInterceptors(List<ConsumerInterceptor<K, V>> interceptors) {
        this.interceptors = interceptors;
    }    

    public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
        ConsumerRecords<K, V> interceptRecords = records;
        for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
            try {
                interceptRecords = interceptor.onConsume(interceptRecords);
            } catch (Exception e) {
                // do not propagate interceptor exception, log and continue 
calling other interceptors
                log.warn("Error executing interceptor onConsume callback", e);
            }
        }
        return interceptRecords;
    }    

    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
            try {
                interceptor.onCommit(offsets);
            } catch (Exception e) {
                // do not propagate interceptor exception, just log
                log.warn("Error executing interceptor onCommit callback", e);
            }
        }
    }    

    @Override
    public void close() {
        for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
            try {
                interceptor.close();
            } catch (Exception e) {
                log.error("Failed to close consumer interceptor ", e);
            }
        }
    }    public List<ConsumerInterceptor<K, V>> getInterceptors() {
        return interceptors;
    }    
     /**
     * Only interceptors which implement {@link ConsumerInterceptor#open()} are 
  called by the container.  This is for backwards
     * compatibility as older interceptors do not contain the default open()
     * */
    public void maybeOpen() throws Exception {
        for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
            try {
                
if(Arrays.stream(interceptor.getClass().getMethods()).anyMatch(method -> 
method.getName() == "open")){
                    interceptor.open();
                }
             } catch (Exception e) {
                log.error("Failed to open consumer interceptor ", e);
                throw e;
            }
        }
    }
} {code}
 

 

 
{code:java}
package org.apache.kafka.clients.producer.internals;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.RecordBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.Closeable;
import java.util.Arrays;
import java.util.List;/**

public class ProducerInterceptors<K, V> implements Closeable {
    private static final Logger log = 
LoggerFactory.getLogger(ProducerInterceptors.class);
    private final List<ProducerInterceptor<K, V>> interceptors;    

public ProducerInterceptors(List<ProducerInterceptor<K, V>> interceptors) {
        this.interceptors = interceptors;
    }
   
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
        ProducerRecord<K, V> interceptRecord = record;
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                interceptRecord = interceptor.onSend(interceptRecord);
            } catch (Exception e) {
                // do not propagate interceptor exception, log and continue 
calling other interceptors
                // be careful not to throw exception from here
                if (record != null)
                    log.warn("Error executing interceptor onSend callback for 
topic: {}, partition: {}", record.topic(), record.partition(), e);
                else
                    log.warn("Error executing interceptor onSend callback", e);
            }
        }
        return interceptRecord;
    }    

public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                interceptor.onAcknowledgement(metadata, exception);
            } catch (Exception e) {
                // do not propagate interceptor exceptions, just log
                log.warn("Error executing interceptor onAcknowledgement 
callback", e);
            }
        }
    }    

 public void onSendError(ProducerRecord<K, V> record, TopicPartition 
interceptTopicPartition, Exception exception) {
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                if (record == null && interceptTopicPartition == null) {
                    interceptor.onAcknowledgement(null, exception);
                } else {
                    if (interceptTopicPartition == null) {
                        interceptTopicPartition = extractTopicPartition(record);
                    }
                    interceptor.onAcknowledgement(new 
RecordMetadata(interceptTopicPartition, -1, -1,
                                    RecordBatch.NO_TIMESTAMP, -1, -1), 
exception);
                }
            } catch (Exception e) {
                // do not propagate interceptor exceptions, just log
                log.warn("Error executing interceptor onAcknowledgement 
callback", e);
            }
        }
    }    
  public static <K, V> TopicPartition extractTopicPartition(ProducerRecord<K, 
V> record) {
        return new TopicPartition(record.topic(), record.partition() == null ? 
RecordMetadata.UNKNOWN_PARTITION : record.partition());
    }
    
    @Override
    public void close() {
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                interceptor.close();
            } catch (Exception e) {
                log.error("Failed to close producer interceptor ", e);
            }
        }
    }    

    /**
     * Only interceptors which implement {@link ProducerInterceptor#open()} are 
called by the container.  This is for backwards
     * compatibility as older interceptors do not contain the default open()
     * */
    public void maybeOpen() throws Exception {
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                
if(Arrays.stream(interceptor.getClass().getMethods()).anyMatch(method -> 
method.getName() == "open")){
                    interceptor.open();
                }
            } catch (Exception e) {
                log.error("Failed to open producer interceptor ", e);
                throw e;
            }
        }
    }
}
 {code}
In summary, the overall workflow is that after the configured interceptor 
instances are returned by the AbstractConfig.getConfiguredInstances(),  the 
Kafka Consumer/Producer constructor's respective interceptor container 
maybeOpen method will be called.  
If in the maybeOpen call, an exception occurs following the interceptor open 
method call, the respective client constructor's try/catch will call the 
interceptor container's close method which in-turn loops through and calls each 
interceptor's close method for clean up of resources allocated in the 
interceptor open method.

 
If an exception occurs in the configure method all objects will be garbage 
collected as this method must no longer be used for creating threads and/or 
objects which utilizes threads, connections or other resources which requires 
clean up.  
h2. Kafka Consumer Constructor maybeOpen example

 
{code:java}
...
        List<ConsumerInterceptor<K, V>> interceptorList = (List) 
config.getConfiguredInstances(
                ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
                ConsumerInterceptor.class,
                Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, 
clientId));        this.interceptors = new 
ConsumerInterceptors<>(interceptorList);
        this.interceptors.maybeOpen();
...{code}
h2. Kafka Producer {color:#172b4d}maybeOpen{color} example
{code:java}
...        
List<ProducerInterceptor<K, V>> interceptorList = (List) 
config.getConfiguredInstances(
                ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                ProducerInterceptor.class,
                Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, 
clientId));
        if (interceptors != null)
            this.interceptors = interceptors;
        else
            this.interceptors = new ProducerInterceptors<>(interceptorList);
        this.interceptors.maybeOpen();  
...{code}
 

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to