Thanks for the KIP! I sympathize with the issue you're facing and with
John's reluctance to let perfect be the enemy of good, and if KIP freeze
were tomorrow, I think this would be good enough. Given that we still have
some time to work with, I'd like to propose an alternative approach and see
what your thoughts are.

There are a few issues with the current client APIs that are closely
related to the KIP:
1. Too many constructors (there are currently four each for KafkaProducer
and KafkaConsumer, yet they all do basically the same thing)
2. Lack of type safety with interceptors (you have no way to enforce at
compile time that your ProducerInterceptor<String, Integer> is used with a
Serializer<String> and Serializer<Integer>, for example)
3. Inflexibility and inconsistency with instantiation of pluggable
interfaces (you can bring your own (de)serializers, but everything else
gets instantiated and configured for you at producer/consumer construction

The KIP as it exists now will only address item 3, and will exacerbate item

In addition, there are a few new issues introduced by the KIP as it exists
1. Tighter coupling between the ProducerConfig/ConsumerConfig classes and
the KafkaProducer/KafkaConsumer classes. Any change we make in the future
that breaks either of these config classes in unexpected ways (but which
does not break the KafkaProducer/KafkaConsumer constructors that do not
accept these classes as parameters) will now have a much higher chance to
also break a user's entire producer/consumer application.
2. Complexity for users like yourself who would like to override behavior
in a ProducerConfig/ConsumerConfig in order to inject pre-instantiated
dependencies. The example in the KIP overrides
AbstractConfig::getConfiguredInstances [1] in order to achieve this. But
there are two other overloaded variants of getConfiguredInstances, and two
AbstractConfig::getConfiguredInstance methods that also exist. We'd either
need to establish a dependency graph between these methods (e.g., some
methods are guaranteed to invoke another overloaded variant) as part of the
public API for the AbstractConfig, or users would need to override every
single one of these methods in order to ensure that their code won't break
at runtime after bumping their Kafka version.

I think introducing the builder pattern for KafkaProducer and KafkaConsumer
would alleviate all of these issues. As a rough draft of what the API might
look like for KafkaProducer:

public class Builder<K, V> {
    private final Map<String, Object> props;
    private Serializer<K> keySerializer;
    private Serializer<V> valueSerializer;
    private List<ProducerInterceptor<K, V>> interceptors;
    private Map<String, Object> configuredInstances;
    private Map<String, List<Object>> configuredInstanceLists;

    public Builder(Map<String, Object> props) {
        this.props = props;
        this.interceptors = new ArrayList<>();
        this.configuredInstances = new HashMap<>();
        this.configuredInstanceLists = new HashMap<>();

    // Use this serializer, if non-null
    // Will take precedence over any serializer class specified in the
properties for this producer
    public Builder withKeySerializer(Serializer<K> serializer) {
        this.keySerializer = serializer;
        return this;

    public Builder withValueSerializer(Serializer<V> serializer) {
        this.valueSerializer = serializer;
        return this;

    // Use these interceptors (has no effect if null)
    // Each must already be configured
    // Will be combined with any interceptor classes also specified in the
properties for this producer
    public Builder withInterceptors(List<ProducerInterceptor<K, V>>
interceptors) {
        if (interceptors != null) {
        return this;

    // Use this plugin instance, if non-null
    // Must already be configured
    // Will take precedence over any plugin class specified for the same
property in the properties for this producer (wording here needs work but
you get the idea)
    public Builder withConfiguredInstance(String property, Object instance)
        this.configuredInstances.put(property, instance);
        return this;

    // Use these plugin instances (has no effect if null)
    // Each must already be configured
    // Will be combined with any plugin classes also specified for the same
property in the properties for this consumer
    public Builder withConfiguredInstances(String property, List<Object>
instances) {
        this.configuredInstanceLists.put(property, instances);
        return this;

    public KafkaProducer<K, V> build() { ... }


[1] -



