Jun, Thanks for summarizing this - it helps confirm for me that I did not misunderstand anything in this thread so far; and that I disagree with the premise that the steps in using the current byte-oriented API is cumbersome or inflexible. It involves instantiating the K-V serializers in code (as opposed to config) and a extra (but explicit - i.e., making it very clear to the user) but simple call to serialize before sending.
The point about downstream queries breaking can happen just as well with the implicit serializers/deserializers - since ultimately people have to instantiate the specific type in their code and if they want to send it they will. I think adoption is also equivalent since people will just instantiate whatever serializer/deserializer they want in one line. Plugging in a new serializer implementation does require a code change, but that can also be avoided via a config driven factory. So I'm still +0 on the change but I'm definitely not against moving forward with the changes. i.e., unless there is any strong -1 on the proposal from anyone else. Thanks, Joel > With a byte array interface, of course there is nothing that one can't do. > However, the real question is that whether we want to encourage people to > use it this way or not. Being able to flow just bytes is definitely easier > to get started. That's why many early adopters choose to do it that way. > However, it's often the case that they start feeling the pain later when > some producers change the data format. Their Hive/Pig queries start to > break and it's a painful process to have the issue fixed. So, the purpose > of this api change is really to encourage people to standardize on a single > serializer/deserializer that supports things like data validation and > schema evolution upstream in the producer. Now, suppose there is an Avro > serializer/deserializer implementation. How do we make it easy for people > to adopt? If the serializer is part of the api, we can just say, wire in > the Avro serializer for key and/or value in the config and then you can > start sending Avro records to the producer. If the serializer is not part > of the api, we have to say, first instantiate a key and/or value serializer > this way, send the key to the key serializer to get the key bytes, send the > value to the value serializer to get the value bytes, and finally send the > bytes to the producer. The former will be simpler and likely makes the > adoption easier. > > Thanks, > > Jun > > On Mon, Dec 15, 2014 at 7:20 PM, Joel Koshy <jjkosh...@gmail.com> wrote: > > > > Documentation is inevitable even if the serializer/deserializer is > > part of the API - since the user has to set it up in the configs. So > > again, you can only encourage people to use it through documentation. > > The simpler byte-oriented API seems clearer to me because anyone who > > needs to send (or receive) a specific data type will _be forced to_ > > (or actually, _intuitively_) select a serializer (or deserializer) and > > will definitely pick an already available implementation if a good one > > already exists. > > > > Sorry I still don't get it and this is really the only sticking point > > for me, albeit a minor one (which is why I have been +0 all along on > > the change). I (and I think many others) would appreciate it if > > someone can help me understand this better. So I will repeat the > > question: What "usage pattern" cannot be supported by easily by the > > simpler API without adding burden on the user? > > > > Thanks, > > > > Joel > > > > On Mon, Dec 15, 2014 at 11:59:34AM -0800, Jun Rao wrote: > > > Joel, > > > > > > It's just that if the serializer/deserializer is not part of the API, you > > > can only encourage people to use it through documentation. However, not > > > everyone will read the documentation if it's not directly used in the > > API. > > > > > > Thanks, > > > > > > Jun > > > > > > On Mon, Dec 15, 2014 at 2:11 AM, Joel Koshy <jjkosh...@gmail.com> wrote: > > > > > > > (sorry about the late follow-up late - I'm traveling most of this > > > > month) > > > > > > > > I'm likely missing something obvious, but I find the following to be a > > > > somewhat vague point that has been mentioned more than once in this > > > > thread without a clear explanation. i.e., why is it hard to share a > > > > serializer/deserializer implementation and just have the clients call > > > > it before a send/receive? What "usage pattern" cannot be supported by > > > > the simpler API? > > > > > > > > > 1. Can we keep the serialization semantics outside the Producer > > interface > > > > > and have simple bytes in / bytes out for the interface (This is what > > we > > > > > have today). > > > > > > > > > > The points for this is to keep the interface simple and usage easy to > > > > > understand. The points against this is that it gets hard to share > > common > > > > > usage patterns around serialization/message validations for the > > future. > > > > > > > > > > > > On Tue, Dec 09, 2014 at 03:51:08AM +0000, Sriram Subramanian wrote: > > > > > Thank you Jay. I agree with the issue that you point w.r.t paired > > > > > serializers. I also think having mix serialization types is rare. To > > get > > > > > the current behavior, one can simply use a ByteArraySerializer. This > > is > > > > > best understood by talking with many customers and you seem to have > > done > > > > > that. I am convinced about the change. > > > > > > > > > > For the rest who gave -1 or 0 for this proposal, does the answers > > for the > > > > > three points(updated) below seem reasonable? Are these explanations > > > > > convincing? > > > > > > > > > > > > > > > 1. Can we keep the serialization semantics outside the Producer > > interface > > > > > and have simple bytes in / bytes out for the interface (This is what > > we > > > > > have today). > > > > > > > > > > The points for this is to keep the interface simple and usage easy to > > > > > understand. The points against this is that it gets hard to share > > common > > > > > usage patterns around serialization/message validations for the > > future. > > > > > > > > > > 2. Can we create a wrapper producer that does the serialization and > > have > > > > > different variants of it for different data formats? > > > > > > > > > > The points for this is again to keep the main API clean. The points > > > > > against this is that it duplicates the API, increases the surface > > area > > > > and > > > > > creates redundancy for a minor addition. > > > > > > > > > > 3. Do we need to support different data types per record? The current > > > > > interface (bytes in/bytes out) lets you instantiate one producer and > > use > > > > > it to send multiple data formats. There seems to be some valid use > > cases > > > > > for this. > > > > > > > > > > > > > > > Mixed serialization types are rare based on interactions with > > customers. > > > > > To get the current behavior, one can simply use a > > ByteArraySerializer. > > > > > > > > > > On 12/5/14 5:00 PM, "Jay Kreps" <j...@confluent.io> wrote: > > > > > > > > > > >Hey Sriram, > > > > > > > > > > > >Thanks! I think this is a very helpful summary. > > > > > > > > > > > >Let me try to address your point about passing in the serde at send > > > > time. > > > > > > > > > > > >I think the first objection is really to the paired key/value > > serializer > > > > > >interfaces. This leads to kind of a weird combinatorial thing where > > you > > > > > >would have an avro/avro serializer a string/avro serializer, a pb/pb > > > > > >serializer, and a string/pb serializer, and so on. But your proposal > > > > would > > > > > >work as well with separate serializers for key and value. > > > > > > > > > > > >I think the downside is just the one you call out--that this is a > > corner > > > > > >case and you end up with two versions of all the apis to support it. > > > > This > > > > > >also makes the serializer api more annoying to implement. I think > > the > > > > > >alternative solution to this case and any other we can give people > > is > > > > just > > > > > >configuring ByteArraySerializer which gives you basically the api > > that > > > > you > > > > > >have now with byte arrays. If this is incredibly common then this > > would > > > > be > > > > > >a silly solution, but I guess the belief is that these cases are > > rare > > > > and > > > > > >a > > > > > >really well implemented avro or json serializer should be 100% of > > what > > > > > >most > > > > > >people need. > > > > > > > > > > > >In practice the cases that actually mix serialization types in a > > single > > > > > >stream are pretty rare I think just because the consumer then has > > the > > > > > >problem of guessing how to deserialize, so most of these will end up > > > > with > > > > > >at least some marker or schema id or whatever that tells you how to > > read > > > > > >the data. Arguable this mixed serialization with marker is itself a > > > > > >serializer type and should have a serializer of its own... > > > > > > > > > > > >-Jay > > > > > > > > > > > >On Fri, Dec 5, 2014 at 3:48 PM, Sriram Subramanian < > > > > > >srsubraman...@linkedin.com.invalid> wrote: > > > > > > > > > > > >> This thread has diverged multiple times now and it would be worth > > > > > >> summarizing them. > > > > > >> > > > > > >> There seems to be the following points of discussion - > > > > > >> > > > > > >> 1. Can we keep the serialization semantics outside the Producer > > > > > >>interface > > > > > >> and have simple bytes in / bytes out for the interface (This is > > what > > > > we > > > > > >> have today). > > > > > >> > > > > > >> The points for this is to keep the interface simple and usage > > easy to > > > > > >> understand. The points against this is that it gets hard to share > > > > common > > > > > >> usage patterns around serialization/message validations for the > > > > future. > > > > > >> > > > > > >> 2. Can we create a wrapper producer that does the serialization > > and > > > > have > > > > > >> different variants of it for different data formats? > > > > > >> > > > > > >> The points for this is again to keep the main API clean. The > > points > > > > > >> against this is that it duplicates the API, increases the surface > > area > > > > > >>and > > > > > >> creates redundancy for a minor addition. > > > > > >> > > > > > >> 3. Do we need to support different data types per record? The > > current > > > > > >> interface (bytes in/bytes out) lets you instantiate one producer > > and > > > > use > > > > > >> it to send multiple data formats. There seems to be some valid use > > > > cases > > > > > >> for this. > > > > > >> > > > > > >> I have still not seen a strong argument against not having this > > > > > >> functionality. Can someone provide their views on why we don't > > need > > > > this > > > > > >> support that is possible with the current API? > > > > > >> > > > > > >> One possible approach for the per record serialization would be to > > > > > >>define > > > > > >> > > > > > >> public interface SerDe<K,V> { > > > > > >> public byte[] serializeKey(); > > > > > >> > > > > > >> public K deserializeKey(); > > > > > >> > > > > > >> public byte[] serializeValue(); > > > > > >> > > > > > >> public V deserializeValue(); > > > > > >> } > > > > > >> > > > > > >> This would be used by both the Producer and the Consumer. > > > > > >> > > > > > >> The send APIs can then be > > > > > >> > > > > > >> public Future<RecordMetadata> send(ProducerRecord<K,V> record); > > > > > >> public Future<RecordMetadata> send(ProducerRecord<K,V> record, > > > > Callback > > > > > >> callback); > > > > > >> > > > > > >> > > > > > >> public Future<RecordMetadata> send(ProducerRecord<K,V> record, > > > > > >>SerDe<K,V> > > > > > >> serde); > > > > > >> > > > > > >> public Future<RecordMetadata> send(ProducerRecord<K,V> record, > > > > > >>SerDe<K,V> > > > > > >> serde, Callback callback); > > > > > >> > > > > > >> > > > > > >> A default SerDe can be set in the config. The producer would use > > the > > > > > >> default from the config if the non-serde send APIs are used. The > > > > > >>downside > > > > > >> to this approach is that we would need to have four variants of > > Send > > > > API > > > > > >> for the Producer. > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> On 12/5/14 3:16 PM, "Jun Rao" <j...@confluent.io> wrote: > > > > > >> > > > > > >> >Jiangjie, > > > > > >> > > > > > > >> >The issue with adding the serializer in ProducerRecord is that > > you > > > > > >>need to > > > > > >> >implement all combinations of serializers for key and value. So, > > > > > >>instead > > > > > >> >of > > > > > >> >just implementing int and string serializers, you will have to > > > > > >>implement > > > > > >> >all 4 combinations. > > > > > >> > > > > > > >> >Adding a new producer constructor like Producer<K, > > > > V>(KeySerializer<K>, > > > > > >> >ValueSerializer<V>, Properties properties) can be useful. > > > > > >> > > > > > > >> >Thanks, > > > > > >> > > > > > > >> >Jun > > > > > >> > > > > > > >> >On Thu, Dec 4, 2014 at 10:33 AM, Jiangjie Qin > > > > > >><j...@linkedin.com.invalid> > > > > > >> >wrote: > > > > > >> > > > > > > >> >> > > > > > >> >> I'm just thinking instead of binding serialization with > > producer, > > > > > >> >>another > > > > > >> >> option is to bind serializer/deserializer with > > > > > >> >> ProducerRecord/ConsumerRecord (please see the detail proposal > > > > below.) > > > > > >> >> The arguments for this option is: > > > > > >> >> A. A single producer could send different message > > types. > > > > > >>There > > > > > >> >>are > > > > > >> >> several use cases in LinkedIn for per record serializer > > > > > >> >> - In Samza, there are some in-stream order-sensitive > > > > control > > > > > >> >> messages > > > > > >> >> having different deserializer from other messages. > > > > > >> >> - There are use cases which need support for sending > > both > > > > > >>Avro > > > > > >> >> messages > > > > > >> >> and raw bytes. > > > > > >> >> - Some use cases needs to deserialize some Avro > > messages > > > > into > > > > > >> >> generic > > > > > >> >> record and some other messages into specific record. > > > > > >> >> B. In current proposal, the serializer/deserilizer is > > > > > >> >>instantiated > > > > > >> >> according to config. Compared with that, binding serializer > > with > > > > > >> >> ProducerRecord and ConsumerRecord is less error prone. > > > > > >> >> > > > > > >> >> > > > > > >> >> This option includes the following changes: > > > > > >> >> A. Add serializer and deserializer interfaces to > > replace > > > > > >> >>serializer > > > > > >> >> instance from config. > > > > > >> >> Public interface Serializer <K, V> { > > > > > >> >> public byte[] serializeKey(K key); > > > > > >> >> public byte[] serializeValue(V value); > > > > > >> >> } > > > > > >> >> Public interface deserializer <K, V> { > > > > > >> >> Public K deserializeKey(byte[] key); > > > > > >> >> public V deserializeValue(byte[] > > value); > > > > > >> >> } > > > > > >> >> > > > > > >> >> B. Make ProducerRecord and ConsumerRecord abstract > > class > > > > > >> >> implementing > > > > > >> >> Serializer <K, V> and Deserializer <K, V> respectively. > > > > > >> >> Public abstract class ProducerRecord <K, V> > > > > > >>implements > > > > > >> >> Serializer <K, V> > > > > > >> >> {...} > > > > > >> >> Public abstract class ConsumerRecord <K, V> > > > > > >>implements > > > > > >> >> Deserializer <K, > > > > > >> >> V> {...} > > > > > >> >> > > > > > >> >> C. Instead of instantiate the serializer/Deserializer > > from > > > > > >> >>config, > > > > > >> >> let > > > > > >> >> concrete ProducerRecord/ConsumerRecord extends the abstract > > class > > > > and > > > > > >> >> override the serialize/deserialize methods. > > > > > >> >> > > > > > >> >> Public class AvroProducerRecord extends > > > > > >>ProducerRecord > > > > > >> >> <String, > > > > > >> >> GenericRecord> { > > > > > >> >> ... > > > > > >> >> @Override > > > > > >> >> Public byte[] serializeKey(String key) > > {Š} > > > > > >> >> @Override > > > > > >> >> public byte[] > > serializeValue(GenericRecord > > > > > >> >>value); > > > > > >> >> } > > > > > >> >> > > > > > >> >> Public class AvroConsumerRecord extends > > > > > >>ConsumerRecord > > > > > >> >> <String, > > > > > >> >> GenericRecord> { > > > > > >> >> ... > > > > > >> >> @Override > > > > > >> >> Public K deserializeKey(byte[] key) {Š} > > > > > >> >> @Override > > > > > >> >> public V deserializeValue(byte[] > > value); > > > > > >> >> } > > > > > >> >> > > > > > >> >> D. The producer API changes to > > > > > >> >> Public class KafkaProducer { > > > > > >> >> ... > > > > > >> >> > > > > > >> >> Future<RecordMetadata> send > > (ProducerRecord > > > > > >><K, > > > > > >> >>V> > > > > > >> >> record) { > > > > > >> >> ... > > > > > >> >> K key = > > > > > >>record.serializeKey(record.key); > > > > > >> >> V value = > > > > > >> >> record.serializedValue(record.value); > > > > > >> >> BytesProducerRecord > > > > > >>bytesProducerRecord > > > > > >> >>= > > > > > >> >> new > > > > > >> >> BytesProducerRecord(topic, partition, key, value); > > > > > >> >> ... > > > > > >> >> } > > > > > >> >> ... > > > > > >> >> } > > > > > >> >> > > > > > >> >> > > > > > >> >> > > > > > >> >> We also had some brainstorm in LinkedIn and here are the > > feedbacks: > > > > > >> >> > > > > > >> >> If the community decide to add the serialization back to new > > > > > >>producer, > > > > > >> >> besides current proposal which changes new producer API to be a > > > > > >> >>template, > > > > > >> >> there are some other options raised during our discussion: > > > > > >> >> 1) Rather than change current new producer API, we can > > > > > >>provide a > > > > > >> >> wrapper > > > > > >> >> of current new producer (e.g. KafkaSerializedProducer) and > > make it > > > > > >> >> available to users. As there is value in the simplicity of > > current > > > > > >>API. > > > > > >> >> > > > > > >> >> 2) If we decide to go with tempalated new producer API, > > > > > >> >>according > > > > > >> >> to > > > > > >> >> experience in LinkedIn, it might worth considering to > > instantiate > > > > the > > > > > >> >> serializer in code instead of from config so we can avoid > > runtime > > > > > >>errors > > > > > >> >> due to dynamic instantiation from config, which is more error > > > > prone. > > > > > >>If > > > > > >> >> that is the case, the producer API could be changed to > > something > > > > > >>like: > > > > > >> >> producer = new Producer<K, V>(KeySerializer<K>, > > > > > >> >> ValueSerializer<V>) > > > > > >> >> > > > > > >> >> --Jiangjie (Becket) Qin > > > > > >> >> > > > > > >> >> > > > > > >> >> On 11/24/14, 5:58 PM, "Jun Rao" <jun...@gmail.com> wrote: > > > > > >> >> > > > > > >> >> >Hi, Everyone, > > > > > >> >> > > > > > > >> >> >I'd like to start a discussion on whether it makes sense to > > add > > > > the > > > > > >> >> >serializer api back to the new java producer. Currently, the > > new > > > > > >>java > > > > > >> >> >producer takes a byte array for both the key and the value. > > While > > > > > >>this > > > > > >> >>api > > > > > >> >> >is simple, it pushes the serialization logic into the > > application. > > > > > >>This > > > > > >> >> >makes it hard to reason about what type of data is being sent > > to > > > > > >>Kafka > > > > > >> >>and > > > > > >> >> >also makes it hard to share an implementation of the > > serializer. > > > > For > > > > > >> >> >example, to support Avro, the serialization logic could be > > quite > > > > > >> >>involved > > > > > >> >> >since it might need to register the Avro schema in some remote > > > > > >>registry > > > > > >> >> >and > > > > > >> >> >maintain a schema cache locally, etc. Without a serialization > > api, > > > > > >>it's > > > > > >> >> >impossible to share such an implementation so that people can > > > > easily > > > > > >> >> >reuse. > > > > > >> >> >We sort of overlooked this implication during the initial > > > > > >>discussion of > > > > > >> >> >the > > > > > >> >> >producer api. > > > > > >> >> > > > > > > >> >> >So, I'd like to propose an api change to the new producer by > > > > adding > > > > > >> >>back > > > > > >> >> >the serializer api similar to what we had in the old producer. > > > > > >> >>Specially, > > > > > >> >> >the proposed api changes are the following. > > > > > >> >> > > > > > > >> >> >First, we change KafkaProducer to take generic types K and V > > for > > > > the > > > > > >> >>key > > > > > >> >> >and the value, respectively. > > > > > >> >> > > > > > > >> >> >public class KafkaProducer<K,V> implements Producer<K,V> { > > > > > >> >> > > > > > > >> >> > public Future<RecordMetadata> send(ProducerRecord<K,V> > > record, > > > > > >> >> >Callback > > > > > >> >> >callback); > > > > > >> >> > > > > > > >> >> > public Future<RecordMetadata> send(ProducerRecord<K,V> > > > > record); > > > > > >> >> >} > > > > > >> >> > > > > > > >> >> >Second, we add two new configs, one for the key serializer and > > > > > >>another > > > > > >> >>for > > > > > >> >> >the value serializer. Both serializers will default to the > > byte > > > > > >>array > > > > > >> >> >implementation. > > > > > >> >> > > > > > > >> >> >public class ProducerConfig extends AbstractConfig { > > > > > >> >> > > > > > > >> >> > .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, > > > > > >> >> >"org.apache.kafka.clients.producer.ByteArraySerializer", > > > > > >> >>Importance.HIGH, > > > > > >> >> >KEY_SERIALIZER_CLASS_DOC) > > > > > >> >> > .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, > > > > > >> >> >"org.apache.kafka.clients.producer.ByteArraySerializer", > > > > > >> >>Importance.HIGH, > > > > > >> >> >VALUE_SERIALIZER_CLASS_DOC); > > > > > >> >> >} > > > > > >> >> > > > > > > >> >> >Both serializers will implement the following interface. > > > > > >> >> > > > > > > >> >> >public interface Serializer<T> extends Configurable { > > > > > >> >> > public byte[] serialize(String topic, T data, boolean > > isKey); > > > > > >> >> > > > > > > >> >> > public void close(); > > > > > >> >> >} > > > > > >> >> > > > > > > >> >> >This is more or less the same as what's in the old producer. > > The > > > > > >>slight > > > > > >> >> >differences are (1) the serializer now only requires a > > > > > >>parameter-less > > > > > >> >> >constructor; (2) the serializer has a configure() and a > > close() > > > > > >>method > > > > > >> >>for > > > > > >> >> >initialization and cleanup, respectively; (3) the serialize() > > > > method > > > > > >> >> >additionally takes the topic and an isKey indicator, both of > > which > > > > > >>are > > > > > >> >> >useful for things like schema registration. > > > > > >> >> > > > > > > >> >> >The detailed changes are included in KAFKA-1797. For > > > > completeness, I > > > > > >> >>also > > > > > >> >> >made the corresponding changes for the new java consumer api > > as > > > > > >>well. > > > > > >> >> > > > > > > >> >> >Note that the proposed api changes are incompatible with > > what's in > > > > > >>the > > > > > >> >> >0.8.2 branch. However, if those api changes are beneficial, > > it's > > > > > >> >>probably > > > > > >> >> >better to include them now in the 0.8.2 release, rather than > > > > later. > > > > > >> >> > > > > > > >> >> >I'd like to discuss mainly two things in this thread. > > > > > >> >> >1. Do people feel that the proposed api changes are > > reasonable? > > > > > >> >> >2. Are there any concerns of including the api changes in the > > > > 0.8.2 > > > > > >> >>final > > > > > >> >> >release? > > > > > >> >> > > > > > > >> >> >Thanks, > > > > > >> >> > > > > > > >> >> >Jun > > > > > >> >> > > > > > >> >> > > > > > >> > > > > > >> > > > > > > > > > > > > > > > > >