Hi, Everyone,

I'd like to start a discussion on whether it makes sense to add the
serializer api back to the new java producer. Currently, the new java
producer takes a byte array for both the key and the value. While this api
is simple, it pushes the serialization logic into the application. This
makes it hard to reason about what type of data is being sent to Kafka and
also makes it hard to share an implementation of the serializer. For
example, to support Avro, the serialization logic could be quite involved
since it might need to register the Avro schema in some remote registry and
maintain a schema cache locally, etc. Without a serialization api, it's
impossible to share such an implementation so that people can easily reuse.
We sort of overlooked this implication during the initial discussion of the
producer api.

So, I'd like to propose an api change to the new producer by adding back
the serializer api similar to what we had in the old producer. Specially,
the proposed api changes are the following.

First, we change KafkaProducer to take generic types K and V for the key
and the value, respectively.

public class KafkaProducer<K,V> implements Producer<K,V> {

    public Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback
callback);

    public Future<RecordMetadata> send(ProducerRecord<K,V> record);
}

Second, we add two new configs, one for the key serializer and another for
the value serializer. Both serializers will default to the byte array
implementation.

public class ProducerConfig extends AbstractConfig {

    .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS,
"org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH,
KEY_SERIALIZER_CLASS_DOC)
    .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS,
"org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH,
VALUE_SERIALIZER_CLASS_DOC);
}

Both serializers will implement the following interface.

public interface Serializer<T> extends Configurable {
    public byte[] serialize(String topic, T data, boolean isKey);

    public void close();
}

This is more or less the same as what's in the old producer. The slight
differences are (1) the serializer now only requires a parameter-less
constructor; (2) the serializer has a configure() and a close() method for
initialization and cleanup, respectively; (3) the serialize() method
additionally takes the topic and an isKey indicator, both of which are
useful for things like schema registration.

The detailed changes are included in KAFKA-1797. For completeness, I also
made the corresponding changes for the new java consumer api as well.

Note that the proposed api changes are incompatible with what's in the
0.8.2 branch. However, if those api changes are beneficial, it's probably
better to include them now in the 0.8.2 release, rather than later.

I'd like to discuss mainly two things in this thread.
1. Do people feel that the proposed api changes are reasonable?
2. Are there any concerns of including the api changes in the 0.8.2 final
release?

Thanks,

Jun

Reply via email to