[ 
https://issues.apache.org/jira/browse/KAFKA-14566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Beard updated KAFKA-14566:
--------------------------------
    Description:     (was: h2. PROBLEM

The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
AbstractConfig.getConfiguredInstances()  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  *List<ConsumerInterceptor<K,V>>* 
interceptors.
h2. Kafka Consumer Constructor
{code:java}
try {
....
List<ConsumerInterceptor<K, V>> interceptorList = (List) 
config.getConfiguredInstances(
        ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ConsumerInterceptor.class,
        Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
Kafka Producer Constructor
{code}
h2. Kafka Producer Constructor
{code:java}
try {
....
List<ProducerInterceptor<K, V>> interceptorList = (List) 
config.getConfiguredInstances(
        ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ProducerInterceptor.class,
        Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, 
clientId));{code}
 

This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a runtime exception.  This 
raising of the runtime exception results produces a resource leakage in the 
first interceptor as the interceptor container i.e. 
ConsumerInterceptors/ProducerInterceptors are never created and therefore the 
first interceptor's and really any interceptor's close method are never called.
h2.  Kafka Consumer Constructor
{code:java}
try {
....
List<ConsumerInterceptor<K, V>> interceptorList = (List) 
config.getConfiguredInstances(
        ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ConsumerInterceptor.class,
        Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
....{code}
 

If the above line results in a runtime exception, the below 
{*}{color:#ffab00}this{color}{*}.{*}{color:#403294}interceptors{color}{*} is 
never created. 

 
{code:java}
this.interceptors = new ConsumerInterceptors<>(interceptorList);{code}
 
h2. Kafka Producer Constructor
{code:java}
try {
....
List<ProducerInterceptor<K, V>> interceptorList = (List) 
config.getConfiguredInstances(
        ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ProducerInterceptor.class,
        Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
{code}
 

If the above line results in a runtime exception, the below this.interceptors 
is never created. 

 
{code:java}
if (interceptors != null)
    this.interceptors = interceptors;
else
    this.interceptors = new ProducerInterceptors<>(interceptorList);
.... {code}
 

Although, both Kafka Consumer and Kafka Producer constructors try/catch 
implement  close for resource clean up, 

 
{code:java}
catch (Throwable t) {
    // call close methods if internal objects are already constructed; this is 
to prevent resource leak. see KAFKA-2121
    // we do not need to call `close` at all when `log` is null, which means no 
internal objects were initialized.
    if (this.log != null) {
        close(0, true);
    }
    // now propagate the exception
    throw new KafkaException("Failed to construct kafka consumer", t);
} {code}
 

their respective close implementation located in the catch above never calls 
the respective container interceptor close method below as the 
{*}{color:#ffab00}this{color}{*}.{*}{color:#403294}interceptors{color}{*} was 
never created.

 
{code:java}
private void close(long timeoutMs, boolean swallowException) {
 ....  
Utils.closeQuietly(interceptors, "consumer interceptors", firstException);
 ....{code}
 

This problem is magnified within a webserver cluster i.e. Confluent's REST 
Proxy server where thousands of requests containing interceptor configuration 
failures can occur in seconds resulting in an inadvertent DDoS attack as 
cluster resources are quickly exhausted, disrupting all service activities.
h2. PROPOSAL

To help ensure the respective container interceptors are able to invoke their 
respective interceptor close methods for proper resource clean up, I propose 
defining a default open method with no implementation and a check exception on 
the respective Consumer/Producer interceptor interfaces.  This open method will 
be responsible for creating threads and/or objects which utilizes threads, 
connections or other resource which requires clean up.  Additionally, the 
default open method enables implementation optionality as it's empty default 
behavior means it will do nothing on unimplemented classes of this interceptor 
interface.  
h2. Kafka Consumer Interceptor Default Open Implementation
{code:java}
package org.apache.kafka.clients.consumer;

import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.TopicPartition;
import java.util.Map;

public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {
  
    ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
  
    void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
    default void open() throws Exception {};
 
    void close();
}
{code}
 
h2. Kafka Producer Interceptor Default Open Implementation
{code:java}
package org.apache.kafka.clients.producer;
import org.apache.kafka.common.Configurable;

public interface ProducerInterceptor<K, V> extends Configurable, AutoCloseable {
   
    ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
   
    void onAcknowledgement(RecordMetadata metadata, Exception exception);
    default void open() throws Exception {};

    void close();
}
{code}
 

Additionally, the Kafka Consumer/Producer Interceptor containers will implement 
a corresponding maybeOpen method which throws a checked Exception.  It's called 
maybeOpen for backwards compatibility purpose as it must  determine whether an 
interceptor's interface contains the newer open method before calling it 
accordingly. 

 *NOTE:* Developers are encouraged to throw a more specific exception.
h2. Kafka Consumer Interceptors MaybeOpen Implementation
{code:java}
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.util.List;
import java.util.Arrays;
import java.util.Map;

public class ConsumerInterceptors<K, V> implements Closeable {
    private static final Logger log = 
LoggerFactory.getLogger(ConsumerInterceptors.class);
    private final List<ConsumerInterceptor<K, V>> interceptors;
    public ConsumerInterceptors(List<ConsumerInterceptor<K, V>> interceptors) {
        this.interceptors = interceptors;
    }
 
    public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
        ConsumerRecords<K, V> interceptRecords = records;
        for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
            try {
                interceptRecords = interceptor.onConsume(interceptRecords);
            } catch (Exception e) {
                log.warn("Error executing interceptor onConsume callback", e);
            }
        }
        return interceptRecords;
    }
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
            try {
                interceptor.onCommit(offsets);
            } catch (Exception e) {
                log.warn("Error executing interceptor onCommit callback", e);
            }
        }
    }
    /**
     * Closes every interceptor in a container.
     */
    @Override
    public void close() {
        for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
            try {
                interceptor.close();
            } catch (Exception e) {
                log.error("Failed to close consumer interceptor ", e);
            }
        }
    }
    public List<ConsumerInterceptor<K, V>> getInterceptors() {
        return interceptors;
    }
    /**
     * Only interceptors which implement {@link ConsumerInterceptor#open()} are 
called by the container.  This is for backwards
     * compatibility as older interceptors do not contain the default open()
     * */
    public void maybeOpen() throws Exception {
        for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
            try {
                
if(Arrays.stream(interceptor.getClass().getMethods()).anyMatch(method -> 
method.getName().equals("open"))){
                    interceptor.open();
                }
             } catch (Exception e) {
                log.error("Failed to open consumer interceptor ", e);
                throw e;
            }
        }
    }
} {code}
h2. Kafka Producer Interceptors MaybeOpen Implementation
{code:java}
package org.apache.kafka.clients.producer.internals;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.RecordBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.util.Arrays;
import java.util.List;

public class ProducerInterceptors<K, V> implements Closeable {
    private static final Logger log = 
LoggerFactory.getLogger(ProducerInterceptors.class);
    private final List<ProducerInterceptor<K, V>> interceptors;
    public ProducerInterceptors(List<ProducerInterceptor<K, V>> interceptors) {
        this.interceptors = interceptors;
    }
   public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
        ProducerRecord<K, V> interceptRecord = record;
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                interceptRecord = interceptor.onSend(interceptRecord);
            } catch (Exception e) {
               if (record != null)
                    log.warn("Error executing interceptor onSend callback for 
topic: {}, partition: {}", record.topic(), record.partition(), e);
                else
                    log.warn("Error executing interceptor onSend callback", e);
            }
        }
        return interceptRecord;
    }
  
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) 
{
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                interceptor.onAcknowledgement(metadata, exception);
            } catch (Exception e) {
                log.warn("Error executing interceptor onAcknowledgement 
callback", e);
            }
        }
    }
 
    public void onSendError(ProducerRecord<K, V> record, TopicPartition 
interceptTopicPartition, Exception exception) {
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                if (record == null && interceptTopicPartition == null) {
                    interceptor.onAcknowledgement(null, exception);
                } else {
                    if (interceptTopicPartition == null) {
                        interceptTopicPartition = extractTopicPartition(record);
                    }
                    interceptor.onAcknowledgement(new 
RecordMetadata(interceptTopicPartition, -1, -1,
                                    RecordBatch.NO_TIMESTAMP, -1, -1), 
exception);
                }
            } catch (Exception e) {
                log.warn("Error executing interceptor onAcknowledgement 
callback", e);
            }
        }
    }
    public static <K, V> TopicPartition extractTopicPartition(ProducerRecord<K, 
V> record) {
        return new TopicPartition(record.topic(), record.partition() == null ? 
RecordMetadata.UNKNOWN_PARTITION : record.partition());
    }
  
    @Override
    public void close() {
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                interceptor.close();
            } catch (Exception e) {
                log.error("Failed to close producer interceptor ", e);
            }
        }
    }
    /**
     * Only interceptors which implement {@link ProducerInterceptor#open()} are 
called by the container.  This is for backwards
     * compatibility as older interceptors do not contain the default open()
     * */
    public void maybeOpen() throws Exception {
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                
if(Arrays.stream(interceptor.getClass().getMethods()).anyMatch(method -> 
method.getName().equals("open"))){
                    interceptor.open();
                }
            } catch (Exception e) {
                log.error("Failed to open producer interceptor ", e);
                throw e;
            }
        }
    }
}
{code}
 

In summary, the overall workflow is that after the configured interceptor 
instances are returned by the AbstractConfig.getConfiguredInstances(),  the 
Kafka Consumer/Producer constructor's respective interceptor container 
maybeOpen method will be called.

 
If in the maybeOpen call, an exception occurs following the interceptor open 
method call, the respective client constructor's try/catch will call the 
interceptor container's close method which in-turn loops through and calls each 
interceptor's close method for clean up of resources allocated in the 
interceptor open method.  

If an exception occurs in the configure method all objects will be garbage 
collected as this method must no longer be used for creating threads and/or 
objects which utilizes threads, connections or other resources which requires 
clean up.  
h2. Kafka Consumer Constructor With MaybeOpen Method Call
{code:java}
KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, 
Deserializer<V> valueDeserializer) {
    try {
...
        List<ConsumerInterceptor<K, V>> interceptorList = (List) 
config.getConfiguredInstances(
                ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
                ConsumerInterceptor.class,
                Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, 
clientId));
        this.interceptors = new ConsumerInterceptors<>(interceptorList);
        this.interceptors.maybeOpen();
   ...
    } catch (Throwable t) {
        // call close methods if internal objects are already constructed; this 
is to prevent resource leak. see KAFKA-2121
        // we do not need to call `close` at all when `log` is null, which 
means no internal objects were initialized.
        if (this.log != null) {
            close(0, true);
        }
        // now propagate the exception
        throw new KafkaException("Failed to construct kafka consumer", t);
    }
}
{code}
h2. Kafka Producer Constructor with MaybeOpen Method Call
{code:java}
KafkaProducer(ProducerConfig config,
              Serializer<K> keySerializer,
              Serializer<V> valueSerializer,
              ProducerMetadata metadata,
              KafkaClient kafkaClient,
              ProducerInterceptors<K, V> interceptors,
              Time time) {
    try {
      ...
        List<ProducerInterceptor<K, V>> interceptorList = (List) 
config.getConfiguredInstances(
                ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                ProducerInterceptor.class,
                Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, 
clientId));
        if (interceptors != null)
            this.interceptors = interceptors;
        else
            this.interceptors = new ProducerInterceptors<>(interceptorList);
        this.interceptors.maybeOpen();
  ...
    } catch (Throwable t) {
        // call close methods if internal objects are already constructed this 
is to prevent resource leak. see KAFKA-2121
        close(Duration.ofMillis(0), true);
        // now propagate the exception
        throw new KafkaException("Failed to construct kafka producer", t);
    }
}
{code}
 )

> Add A No Implementation Default Open Method To Consumer and Producer 
> Interceptor Interfaces
> -------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-14566
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14566
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>            Reporter: Terry Beard
>            Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to