[ https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Terry Beard updated KAFKA-14565: -------------------------------- Summary: Interceptor Resource Leakage Prevention (was: Add A No Implementation Default Open Method To Consumer and Producer Interceptor Interfaces) > Interceptor Resource Leakage Prevention > --------------------------------------- > > Key: KAFKA-14565 > URL: https://issues.apache.org/jira/browse/KAFKA-14565 > Project: Kafka > Issue Type: Improvement > Components: clients > Reporter: Terry Beard > Assignee: Terry Beard > Priority: Major > > h2. PROBLEM > The Consumer and Producer interceptor interfaces and their corresponding > Kafka Consumer and Producer constructors do not adequately support cleanup of > underlying interceptor resources. > Currently within the Kafka Consumer and Kafka Producer constructors, the > AbstractConfig.getConfiguredInstances() is delagated responsibilty for both > creating and configuring each interceptor listed in the interceptor.classes > property and returns a configured List<ConsumerInterceptor<K,V>> > interceptors. > h2. Kafka Consumer Constructor > > {code:java} > try { > .... > List<ConsumerInterceptor<K, V>> interceptorList = (List) > config.getConfiguredInstances( > ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, > ConsumerInterceptor.class, > Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)); > {code} > > > h2. Kafka Producer Constructor > {code:java} > try { > .... > List<ProducerInterceptor<K, V>> interceptorList = (List) > config.getConfiguredInstances( > ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, > ProducerInterceptor.class, > Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)); > {code} > This dual responsibility for both creation and configuration is problematic > when it involves multiple interceptors where at least one interceptor's > configure method implementation creates and/or depends on objects which > creates threads, connections or other resources which requires clean up and > the subsequent interceptor's configure method raises a runtime exception. > This raising of the runtime exception results produces a resource leakage in > the first interceptor as the interceptor container i.e. > ConsumerInterceptors/ProducerInterceptors are never created and therefore the > first interceptor's and really any interceptor's close method are never > called. > h2. KafkaConsumer Constructor > {code:java} > try { > .... > List<ConsumerInterceptor<K, V>> interceptorList = (List) > config.getConfiguredInstances( > ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, > ConsumerInterceptor.class, > Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)); > .... {code} > If the above line results in a runtime exception, the below this.interceptors > is never created. > {code:java} > this.interceptors = new ConsumerInterceptors<>(interceptorList); {code} > h2. Kafka Producer{color:#172b4d} Constructor{color} > {code:java} > try { > .... > List<ProducerInterceptor<K, V>> interceptorList = (List) > config.getConfiguredInstances( > ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, > ProducerInterceptor.class, > Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)); > {code} > If the above line results in a runtime exception, the below this.interceptors > is never created. > {code:java} > if (interceptors != null) > this.interceptors = interceptors; > else > this.interceptors = new ProducerInterceptors<>(interceptorList); > .... {code} > > Although, both Kafka Consumer and Kafka Producer constructors try/catch > implement close for resource clean up, > {code:java} > ... > catch (Throwable t) { > // call close methods if internal objects are already constructed; this > is to prevent resource leak. see KAFKA-2121 > // we do not need to call `close` at all when `log` is null, which means > no internal objects were initialized. > if (this.log != null) { > close(0, true); > } > // now propagate the exception > throw new KafkaException("Failed to construct kafka consumer", t); > } {code} > their respective close implementation located in the catch above never calls > the respective container interceptor close method below as the > {color:#172b4d}*this{color}.{color:#403294}interceptors{color}* was never > created. > {code:java} > private void close(long timeoutMs, boolean swallowException) { > .... > Utils.closeQuietly(interceptors, "consumer interceptors", firstException); > .... {code} > This problem is magnified within a webserver cluster i.e. Confluent's REST > Proxy server where thousands of requests containing interceptor configuration > failures can occur in seconds resulting in an inadvertent DDoS attack as > cluster resources are quickly exhausted, disrupting all service activities. > h2. PROPOSAL > To help ensure the respective container interceptors are able to invoke their > respective interceptor close methods for proper resource clean up, I propose > defining a default open method with no implementation and a check exception > on the respective Consumer/Producer interceptor interfaces. This open method > will be responsible for creating threads and/or objects which utilizes > threads, connections or other resource which requires clean up. > Additionally, the default open method enables implementation optionality as > it's empty default behavior means it will do nothing on unimplemented classes > of this interceptor interface. > > {code:java} > package org.apache.kafka.clients.consumer; > import org.apache.kafka.common.Configurable; > import org.apache.kafka.common.TopicPartition; > import java.util.Map; > public interface ConsumerInterceptor<K, V> extends Configurable, > AutoCloseable { > ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records); > void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets); > default void open() throws Exception {}; > > void close(); > } > {code} > > > {code:java} > package org.apache.kafka.clients.producer; > import org.apache.kafka.common.Configurable; > public interface ProducerInterceptor<K, V> extends Configurable, > AutoCloseable { > > ProducerRecord<K, V> onSend(ProducerRecord<K, V> record); > void onAcknowledgement(RecordMetadata metadata, Exception exception); > > default void open() throws Exception {}; > > void close(); > } > {code} > > > {color:#172b4d}Additionally, the Kafka Consumer/Producer Interceptor > containers will implement a corresponding maybeOpen method which throws a > checked Exception. It's called maybeOpen for backwards compatibility purpose > as it must determine whether an interceptor's interface contains the newer > open method before calling it accordingly. {color} > {color:#172b4d}{*}NOTE{*}: Developers are encouraged to throw a more specific > exception.{color} > > {code:java} > package org.apache.kafka.clients.consumer.internals; > import org.apache.kafka.clients.consumer.ConsumerInterceptor; > import org.apache.kafka.clients.consumer.ConsumerRecords; > import org.apache.kafka.clients.consumer.OffsetAndMetadata; > import org.apache.kafka.clients.producer.ProducerInterceptor; > import org.apache.kafka.common.TopicPartition; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory;import java.io.Closeable; > import java.util.List; > import java.util.Arrays; > import java.util.Map;/** > public class ConsumerInterceptors<K, V> implements Closeable { > private static final Logger log = > LoggerFactory.getLogger(ConsumerInterceptors.class); > private final List<ConsumerInterceptor<K, V>> interceptors; > public ConsumerInterceptors(List<ConsumerInterceptor<K, V>> interceptors) { > this.interceptors = interceptors; > } > public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) { > ConsumerRecords<K, V> interceptRecords = records; > for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) { > try { > interceptRecords = interceptor.onConsume(interceptRecords); > } catch (Exception e) { > // do not propagate interceptor exception, log and continue > calling other interceptors > log.warn("Error executing interceptor onConsume callback", e); > } > } > return interceptRecords; > } > public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) { > for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) { > try { > interceptor.onCommit(offsets); > } catch (Exception e) { > // do not propagate interceptor exception, just log > log.warn("Error executing interceptor onCommit callback", e); > } > } > } > @Override > public void close() { > for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) { > try { > interceptor.close(); > } catch (Exception e) { > log.error("Failed to close consumer interceptor ", e); > } > } > } public List<ConsumerInterceptor<K, V>> getInterceptors() { > return interceptors; > } > /** > * Only interceptors which implement {@link ConsumerInterceptor#open()} > are called by the container. This is for backwards > * compatibility as older interceptors do not contain the default open() > * */ > public void maybeOpen() throws Exception { > for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) { > try { > > if(Arrays.stream(interceptor.getClass().getMethods()).anyMatch(method -> > method.getName() == "open")){ > interceptor.open(); > } > } catch (Exception e) { > log.error("Failed to open consumer interceptor ", e); > throw e; > } > } > } > } {code} > > > > {code:java} > package org.apache.kafka.clients.producer.internals; > import org.apache.kafka.clients.producer.ProducerInterceptor; > import org.apache.kafka.clients.producer.ProducerRecord; > import org.apache.kafka.clients.producer.RecordMetadata; > import org.apache.kafka.common.TopicPartition; > import org.apache.kafka.common.record.RecordBatch; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory;import java.io.Closeable; > import java.util.Arrays; > import java.util.List;/** > public class ProducerInterceptors<K, V> implements Closeable { > private static final Logger log = > LoggerFactory.getLogger(ProducerInterceptors.class); > private final List<ProducerInterceptor<K, V>> interceptors; > public ProducerInterceptors(List<ProducerInterceptor<K, V>> interceptors) { > this.interceptors = interceptors; > } > > public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) { > ProducerRecord<K, V> interceptRecord = record; > for (ProducerInterceptor<K, V> interceptor : this.interceptors) { > try { > interceptRecord = interceptor.onSend(interceptRecord); > } catch (Exception e) { > // do not propagate interceptor exception, log and continue > calling other interceptors > // be careful not to throw exception from here > if (record != null) > log.warn("Error executing interceptor onSend callback for > topic: {}, partition: {}", record.topic(), record.partition(), e); > else > log.warn("Error executing interceptor onSend callback", > e); > } > } > return interceptRecord; > } > public void onAcknowledgement(RecordMetadata metadata, Exception exception) { > for (ProducerInterceptor<K, V> interceptor : this.interceptors) { > try { > interceptor.onAcknowledgement(metadata, exception); > } catch (Exception e) { > // do not propagate interceptor exceptions, just log > log.warn("Error executing interceptor onAcknowledgement > callback", e); > } > } > } > public void onSendError(ProducerRecord<K, V> record, TopicPartition > interceptTopicPartition, Exception exception) { > for (ProducerInterceptor<K, V> interceptor : this.interceptors) { > try { > if (record == null && interceptTopicPartition == null) { > interceptor.onAcknowledgement(null, exception); > } else { > if (interceptTopicPartition == null) { > interceptTopicPartition = > extractTopicPartition(record); > } > interceptor.onAcknowledgement(new > RecordMetadata(interceptTopicPartition, -1, -1, > RecordBatch.NO_TIMESTAMP, -1, -1), > exception); > } > } catch (Exception e) { > // do not propagate interceptor exceptions, just log > log.warn("Error executing interceptor onAcknowledgement > callback", e); > } > } > } > public static <K, V> TopicPartition extractTopicPartition(ProducerRecord<K, > V> record) { > return new TopicPartition(record.topic(), record.partition() == null > ? RecordMetadata.UNKNOWN_PARTITION : record.partition()); > } > > @Override > public void close() { > for (ProducerInterceptor<K, V> interceptor : this.interceptors) { > try { > interceptor.close(); > } catch (Exception e) { > log.error("Failed to close producer interceptor ", e); > } > } > } > /** > * Only interceptors which implement {@link ProducerInterceptor#open()} > are called by the container. This is for backwards > * compatibility as older interceptors do not contain the default open() > * */ > public void maybeOpen() throws Exception { > for (ProducerInterceptor<K, V> interceptor : this.interceptors) { > try { > > if(Arrays.stream(interceptor.getClass().getMethods()).anyMatch(method -> > method.getName() == "open")){ > interceptor.open(); > } > } catch (Exception e) { > log.error("Failed to open producer interceptor ", e); > throw e; > } > } > } > } > {code} > In summary, the overall workflow is that after the configured interceptor > instances are returned by the AbstractConfig.getConfiguredInstances(), the > Kafka Consumer/Producer constructor's respective interceptor container > maybeOpen method will be called. > If in the maybeOpen call, an exception occurs following the interceptor open > method call, the respective client constructor's try/catch will call the > interceptor container's close method which in-turn loops through and calls > each interceptor's close method for clean up of resources allocated in the > interceptor open method. > > If an exception occurs in the configure method all objects will be garbage > collected as this method must no longer be used for creating threads and/or > objects which utilizes threads, connections or other resources which requires > clean up. > h2. Kafka Consumer Constructor maybeOpen example > > {code:java} > ... > List<ConsumerInterceptor<K, V>> interceptorList = (List) > config.getConfiguredInstances( > ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, > ConsumerInterceptor.class, > Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, > clientId)); this.interceptors = new > ConsumerInterceptors<>(interceptorList); > this.interceptors.maybeOpen(); > ...{code} > h2. Kafka Producer {color:#172b4d}maybeOpen{color} example > {code:java} > ... > List<ProducerInterceptor<K, V>> interceptorList = (List) > config.getConfiguredInstances( > ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, > ProducerInterceptor.class, > Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, > clientId)); > if (interceptors != null) > this.interceptors = interceptors; > else > this.interceptors = new ProducerInterceptors<>(interceptorList); > this.interceptors.maybeOpen(); > ...{code} > > > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)