Jun,

Thanks for summarizing this - it helps confirm for me that I did not
misunderstand anything in this thread so far; and that I disagree with
the premise that the steps in using the current byte-oriented API is
cumbersome or inflexible. It involves instantiating the K-V
serializers in code (as opposed to config) and a extra (but explicit
- i.e., making it very clear to the user) but simple call to serialize
before sending.

The point about downstream queries breaking can happen just as well
with the implicit serializers/deserializers - since ultimately people
have to instantiate the specific type in their code and if they want
to send it they will.

I think adoption is also equivalent since people will just instantiate
whatever serializer/deserializer they want in one line. Plugging in a
new serializer implementation does require a code change, but that can
also be avoided via a config driven factory.

So I'm still +0 on the change but I'm definitely not against moving
forward with the changes. i.e., unless there is any strong -1 on the
proposal from anyone else.

Thanks,

Joel

> With a byte array interface, of course there is nothing that one can't do.
> However, the real question is that whether we want to encourage people to
> use it this way or not. Being able to flow just bytes is definitely easier
> to get started. That's why many early adopters choose to do it that way.
> However, it's often the case that they start feeling the pain later when
> some producers change the data format. Their Hive/Pig queries start to
> break and it's a painful process to have the issue fixed. So, the purpose
> of this api change is really to encourage people to standardize on a single
> serializer/deserializer that supports things like data validation and
> schema evolution upstream in the producer. Now, suppose there is an Avro
> serializer/deserializer implementation. How do we make it easy for people
> to adopt? If the serializer is part of the api, we can just say, wire in
> the Avro serializer for key and/or value in the config and then you can
> start sending Avro records to the producer. If the serializer is not part
> of the api, we have to say, first instantiate a key and/or value serializer
> this way, send the key to the key serializer to get the key bytes, send the
> value to the value serializer to get the value bytes, and finally send the
> bytes to the producer. The former will be simpler and likely makes the
> adoption easier.
> 
> Thanks,
> 
> Jun
> 
> On Mon, Dec 15, 2014 at 7:20 PM, Joel Koshy <jjkosh...@gmail.com> wrote:
> >
> > Documentation is inevitable even if the serializer/deserializer is
> > part of the API - since the user has to set it up in the configs. So
> > again, you can only encourage people to use it through documentation.
> > The simpler byte-oriented API seems clearer to me because anyone who
> > needs to send (or receive) a specific data type will _be forced to_
> > (or actually, _intuitively_) select a serializer (or deserializer) and
> > will definitely pick an already available implementation if a good one
> > already exists.
> >
> > Sorry I still don't get it and this is really the only sticking point
> > for me, albeit a minor one (which is why I have been +0 all along on
> > the change). I (and I think many others) would appreciate it if
> > someone can help me understand this better.  So I will repeat the
> > question: What "usage pattern" cannot be supported by easily by the
> > simpler API without adding burden on the user?
> >
> > Thanks,
> >
> > Joel
> >
> > On Mon, Dec 15, 2014 at 11:59:34AM -0800, Jun Rao wrote:
> > > Joel,
> > >
> > > It's just that if the serializer/deserializer is not part of the API, you
> > > can only encourage people to use it through documentation. However, not
> > > everyone will read the documentation if it's not directly used in the
> > API.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Mon, Dec 15, 2014 at 2:11 AM, Joel Koshy <jjkosh...@gmail.com> wrote:
> > >
> > > > (sorry about the late follow-up late - I'm traveling most of this
> > > > month)
> > > >
> > > > I'm likely missing something obvious, but I find the following to be a
> > > > somewhat vague point that has been mentioned more than once in this
> > > > thread without a clear explanation. i.e., why is it hard to share a
> > > > serializer/deserializer implementation and just have the clients call
> > > > it before a send/receive? What "usage pattern" cannot be supported by
> > > > the simpler API?
> > > >
> > > > > 1. Can we keep the serialization semantics outside the Producer
> > interface
> > > > > and have simple bytes in / bytes out for the interface (This is what
> > we
> > > > > have today).
> > > > >
> > > > > The points for this is to keep the interface simple and usage easy to
> > > > > understand. The points against this is that it gets hard to share
> > common
> > > > > usage patterns around serialization/message validations for the
> > future.
> > > >
> > > >
> > > > On Tue, Dec 09, 2014 at 03:51:08AM +0000, Sriram Subramanian wrote:
> > > > > Thank you Jay. I agree with the issue that you point w.r.t paired
> > > > > serializers. I also think having mix serialization types is rare. To
> > get
> > > > > the current behavior, one can simply use a ByteArraySerializer. This
> > is
> > > > > best understood by talking with many customers and you seem to have
> > done
> > > > > that. I am convinced about the change.
> > > > >
> > > > > For the rest who gave -1 or 0 for this proposal, does the answers
> > for the
> > > > > three points(updated) below seem reasonable? Are these explanations
> > > > > convincing?
> > > > >
> > > > >
> > > > > 1. Can we keep the serialization semantics outside the Producer
> > interface
> > > > > and have simple bytes in / bytes out for the interface (This is what
> > we
> > > > > have today).
> > > > >
> > > > > The points for this is to keep the interface simple and usage easy to
> > > > > understand. The points against this is that it gets hard to share
> > common
> > > > > usage patterns around serialization/message validations for the
> > future.
> > > > >
> > > > > 2. Can we create a wrapper producer that does the serialization and
> > have
> > > > > different variants of it for different data formats?
> > > > >
> > > > > The points for this is again to keep the main API clean. The points
> > > > > against this is that it duplicates the API, increases the surface
> > area
> > > > and
> > > > > creates redundancy for a minor addition.
> > > > >
> > > > > 3. Do we need to support different data types per record? The current
> > > > > interface (bytes in/bytes out) lets you instantiate one producer and
> > use
> > > > > it to send multiple data formats. There seems to be some valid use
> > cases
> > > > > for this.
> > > > >
> > > > >
> > > > > Mixed serialization types are rare based on interactions with
> > customers.
> > > > > To get the current behavior, one can simply use a
> > ByteArraySerializer.
> > > > >
> > > > > On 12/5/14 5:00 PM, "Jay Kreps" <j...@confluent.io> wrote:
> > > > >
> > > > > >Hey Sriram,
> > > > > >
> > > > > >Thanks! I think this is a very helpful summary.
> > > > > >
> > > > > >Let me try to address your point about passing in the serde at send
> > > > time.
> > > > > >
> > > > > >I think the first objection is really to the paired key/value
> > serializer
> > > > > >interfaces. This leads to kind of a weird combinatorial thing where
> > you
> > > > > >would have an avro/avro serializer a string/avro serializer, a pb/pb
> > > > > >serializer, and a string/pb serializer, and so on. But your proposal
> > > > would
> > > > > >work as well with separate serializers for key and value.
> > > > > >
> > > > > >I think the downside is just the one you call out--that this is a
> > corner
> > > > > >case and you end up with two versions of all the apis to support it.
> > > > This
> > > > > >also makes the serializer api more annoying to implement. I think
> > the
> > > > > >alternative solution to this case and any other we can give people
> > is
> > > > just
> > > > > >configuring ByteArraySerializer which gives you basically the api
> > that
> > > > you
> > > > > >have now with byte arrays. If this is incredibly common then this
> > would
> > > > be
> > > > > >a silly solution, but I guess the belief is that these cases are
> > rare
> > > > and
> > > > > >a
> > > > > >really well implemented avro or json serializer should be 100% of
> > what
> > > > > >most
> > > > > >people need.
> > > > > >
> > > > > >In practice the cases that actually mix serialization types in a
> > single
> > > > > >stream are pretty rare I think just because the consumer then has
> > the
> > > > > >problem of guessing how to deserialize, so most of these will end up
> > > > with
> > > > > >at least some marker or schema id or whatever that tells you how to
> > read
> > > > > >the data. Arguable this mixed serialization with marker is itself a
> > > > > >serializer type and should have a serializer of its own...
> > > > > >
> > > > > >-Jay
> > > > > >
> > > > > >On Fri, Dec 5, 2014 at 3:48 PM, Sriram Subramanian <
> > > > > >srsubraman...@linkedin.com.invalid> wrote:
> > > > > >
> > > > > >> This thread has diverged multiple times now and it would be worth
> > > > > >> summarizing them.
> > > > > >>
> > > > > >> There seems to be the following points of discussion -
> > > > > >>
> > > > > >> 1. Can we keep the serialization semantics outside the Producer
> > > > > >>interface
> > > > > >> and have simple bytes in / bytes out for the interface (This is
> > what
> > > > we
> > > > > >> have today).
> > > > > >>
> > > > > >> The points for this is to keep the interface simple and usage
> > easy to
> > > > > >> understand. The points against this is that it gets hard to share
> > > > common
> > > > > >> usage patterns around serialization/message validations for the
> > > > future.
> > > > > >>
> > > > > >> 2. Can we create a wrapper producer that does the serialization
> > and
> > > > have
> > > > > >> different variants of it for different data formats?
> > > > > >>
> > > > > >> The points for this is again to keep the main API clean. The
> > points
> > > > > >> against this is that it duplicates the API, increases the surface
> > area
> > > > > >>and
> > > > > >> creates redundancy for a minor addition.
> > > > > >>
> > > > > >> 3. Do we need to support different data types per record? The
> > current
> > > > > >> interface (bytes in/bytes out) lets you instantiate one producer
> > and
> > > > use
> > > > > >> it to send multiple data formats. There seems to be some valid use
> > > > cases
> > > > > >> for this.
> > > > > >>
> > > > > >> I have still not seen a strong argument against not having this
> > > > > >> functionality. Can someone provide their views on why we don't
> > need
> > > > this
> > > > > >> support that is possible with the current API?
> > > > > >>
> > > > > >> One possible approach for the per record serialization would be to
> > > > > >>define
> > > > > >>
> > > > > >> public interface SerDe<K,V> {
> > > > > >>   public byte[] serializeKey();
> > > > > >>
> > > > > >>   public K deserializeKey();
> > > > > >>
> > > > > >>   public byte[] serializeValue();
> > > > > >>
> > > > > >>   public V deserializeValue();
> > > > > >> }
> > > > > >>
> > > > > >> This would be used by both the Producer and the Consumer.
> > > > > >>
> > > > > >> The send APIs can then be
> > > > > >>
> > > > > >> public Future<RecordMetadata> send(ProducerRecord<K,V> record);
> > > > > >> public Future<RecordMetadata> send(ProducerRecord<K,V> record,
> > > > Callback
> > > > > >> callback);
> > > > > >>
> > > > > >>
> > > > > >> public Future<RecordMetadata> send(ProducerRecord<K,V> record,
> > > > > >>SerDe<K,V>
> > > > > >> serde);
> > > > > >>
> > > > > >> public Future<RecordMetadata> send(ProducerRecord<K,V> record,
> > > > > >>SerDe<K,V>
> > > > > >> serde, Callback callback);
> > > > > >>
> > > > > >>
> > > > > >> A default SerDe can be set in the config. The producer would use
> > the
> > > > > >> default from the config if the non-serde send APIs are used. The
> > > > > >>downside
> > > > > >> to this approach is that we would need to have four variants of
> > Send
> > > > API
> > > > > >> for the Producer.
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >> On 12/5/14 3:16 PM, "Jun Rao" <j...@confluent.io> wrote:
> > > > > >>
> > > > > >> >Jiangjie,
> > > > > >> >
> > > > > >> >The issue with adding the serializer in ProducerRecord is that
> > you
> > > > > >>need to
> > > > > >> >implement all combinations of serializers for key and value. So,
> > > > > >>instead
> > > > > >> >of
> > > > > >> >just implementing int and string serializers, you will have to
> > > > > >>implement
> > > > > >> >all 4 combinations.
> > > > > >> >
> > > > > >> >Adding a new producer constructor like Producer<K,
> > > > V>(KeySerializer<K>,
> > > > > >> >ValueSerializer<V>, Properties properties) can be useful.
> > > > > >> >
> > > > > >> >Thanks,
> > > > > >> >
> > > > > >> >Jun
> > > > > >> >
> > > > > >> >On Thu, Dec 4, 2014 at 10:33 AM, Jiangjie Qin
> > > > > >><j...@linkedin.com.invalid>
> > > > > >> >wrote:
> > > > > >> >
> > > > > >> >>
> > > > > >> >> I'm just thinking instead of binding serialization with
> > producer,
> > > > > >> >>another
> > > > > >> >> option is to bind serializer/deserializer with
> > > > > >> >> ProducerRecord/ConsumerRecord (please see the detail proposal
> > > > below.)
> > > > > >> >>            The arguments for this option is:
> > > > > >> >>         A. A single producer could send different message
> > types.
> > > > > >>There
> > > > > >> >>are
> > > > > >> >> several use cases in LinkedIn for per record serializer
> > > > > >> >>         - In Samza, there are some in-stream order-sensitive
> > > > control
> > > > > >> >> messages
> > > > > >> >> having different deserializer from other messages.
> > > > > >> >>         - There are use cases which need support for sending
> > both
> > > > > >>Avro
> > > > > >> >> messages
> > > > > >> >> and raw bytes.
> > > > > >> >>         - Some use cases needs to deserialize some Avro
> > messages
> > > > into
> > > > > >> >> generic
> > > > > >> >> record and some other messages into specific record.
> > > > > >> >>         B. In current proposal, the serializer/deserilizer is
> > > > > >> >>instantiated
> > > > > >> >> according to config. Compared with that, binding serializer
> > with
> > > > > >> >> ProducerRecord and ConsumerRecord is less error prone.
> > > > > >> >>
> > > > > >> >>
> > > > > >> >>         This option includes the following changes:
> > > > > >> >>         A. Add serializer and deserializer interfaces to
> > replace
> > > > > >> >>serializer
> > > > > >> >> instance from config.
> > > > > >> >>                 Public interface Serializer <K, V> {
> > > > > >> >>                         public byte[] serializeKey(K key);
> > > > > >> >>                         public byte[] serializeValue(V value);
> > > > > >> >>                 }
> > > > > >> >>                 Public interface deserializer <K, V> {
> > > > > >> >>                         Public K deserializeKey(byte[] key);
> > > > > >> >>                         public V deserializeValue(byte[]
> > value);
> > > > > >> >>                 }
> > > > > >> >>
> > > > > >> >>         B. Make ProducerRecord and ConsumerRecord abstract
> > class
> > > > > >> >> implementing
> > > > > >> >> Serializer <K, V> and Deserializer <K, V> respectively.
> > > > > >> >>                 Public abstract class ProducerRecord <K, V>
> > > > > >>implements
> > > > > >> >> Serializer <K, V>
> > > > > >> >> {...}
> > > > > >> >>                 Public abstract class ConsumerRecord <K, V>
> > > > > >>implements
> > > > > >> >> Deserializer <K,
> > > > > >> >> V> {...}
> > > > > >> >>
> > > > > >> >>         C. Instead of instantiate the serializer/Deserializer
> > from
> > > > > >> >>config,
> > > > > >> >> let
> > > > > >> >> concrete ProducerRecord/ConsumerRecord extends the abstract
> > class
> > > > and
> > > > > >> >> override the serialize/deserialize methods.
> > > > > >> >>
> > > > > >> >>                 Public class AvroProducerRecord extends
> > > > > >>ProducerRecord
> > > > > >> >> <String,
> > > > > >> >> GenericRecord> {
> > > > > >> >>                         ...
> > > > > >> >>                         @Override
> > > > > >> >>                         Public byte[] serializeKey(String key)
> > {Š}
> > > > > >> >>                         @Override
> > > > > >> >>                         public byte[]
> > serializeValue(GenericRecord
> > > > > >> >>value);
> > > > > >> >>                 }
> > > > > >> >>
> > > > > >> >>                 Public class AvroConsumerRecord extends
> > > > > >>ConsumerRecord
> > > > > >> >> <String,
> > > > > >> >> GenericRecord> {
> > > > > >> >>                         ...
> > > > > >> >>                         @Override
> > > > > >> >>                         Public K deserializeKey(byte[] key) {Š}
> > > > > >> >>                         @Override
> > > > > >> >>                         public V deserializeValue(byte[]
> > value);
> > > > > >> >>                 }
> > > > > >> >>
> > > > > >> >>         D. The producer API changes to
> > > > > >> >>                 Public class KafkaProducer {
> > > > > >> >>                         ...
> > > > > >> >>
> > > > > >> >>                         Future<RecordMetadata> send
> > (ProducerRecord
> > > > > >><K,
> > > > > >> >>V>
> > > > > >> >> record) {
> > > > > >> >>                                 ...
> > > > > >> >>                                 K key =
> > > > > >>record.serializeKey(record.key);
> > > > > >> >>                                 V value =
> > > > > >> >> record.serializedValue(record.value);
> > > > > >> >>                                 BytesProducerRecord
> > > > > >>bytesProducerRecord
> > > > > >> >>=
> > > > > >> >> new
> > > > > >> >> BytesProducerRecord(topic, partition, key, value);
> > > > > >> >>                                 ...
> > > > > >> >>                         }
> > > > > >> >>                         ...
> > > > > >> >>                 }
> > > > > >> >>
> > > > > >> >>
> > > > > >> >>
> > > > > >> >> We also had some brainstorm in LinkedIn and here are the
> > feedbacks:
> > > > > >> >>
> > > > > >> >> If the community decide to add the serialization back to new
> > > > > >>producer,
> > > > > >> >> besides current proposal which changes new producer API to be a
> > > > > >> >>template,
> > > > > >> >> there are some other options raised during our discussion:
> > > > > >> >>         1) Rather than change current new producer API, we can
> > > > > >>provide a
> > > > > >> >> wrapper
> > > > > >> >> of current new producer (e.g. KafkaSerializedProducer) and
> > make it
> > > > > >> >> available to users. As there is value in the simplicity of
> > current
> > > > > >>API.
> > > > > >> >>
> > > > > >> >>         2) If we decide to go with tempalated new producer API,
> > > > > >> >>according
> > > > > >> >> to
> > > > > >> >> experience in LinkedIn, it might worth considering to
> > instantiate
> > > > the
> > > > > >> >> serializer in code instead of from config so we can avoid
> > runtime
> > > > > >>errors
> > > > > >> >> due to dynamic instantiation from config, which is more error
> > > > prone.
> > > > > >>If
> > > > > >> >> that is the case, the producer API could be changed to
> > something
> > > > > >>like:
> > > > > >> >>                 producer = new Producer<K, V>(KeySerializer<K>,
> > > > > >> >> ValueSerializer<V>)
> > > > > >> >>
> > > > > >> >> --Jiangjie (Becket) Qin
> > > > > >> >>
> > > > > >> >>
> > > > > >> >> On 11/24/14, 5:58 PM, "Jun Rao" <jun...@gmail.com> wrote:
> > > > > >> >>
> > > > > >> >> >Hi, Everyone,
> > > > > >> >> >
> > > > > >> >> >I'd like to start a discussion on whether it makes sense to
> > add
> > > > the
> > > > > >> >> >serializer api back to the new java producer. Currently, the
> > new
> > > > > >>java
> > > > > >> >> >producer takes a byte array for both the key and the value.
> > While
> > > > > >>this
> > > > > >> >>api
> > > > > >> >> >is simple, it pushes the serialization logic into the
> > application.
> > > > > >>This
> > > > > >> >> >makes it hard to reason about what type of data is being sent
> > to
> > > > > >>Kafka
> > > > > >> >>and
> > > > > >> >> >also makes it hard to share an implementation of the
> > serializer.
> > > > For
> > > > > >> >> >example, to support Avro, the serialization logic could be
> > quite
> > > > > >> >>involved
> > > > > >> >> >since it might need to register the Avro schema in some remote
> > > > > >>registry
> > > > > >> >> >and
> > > > > >> >> >maintain a schema cache locally, etc. Without a serialization
> > api,
> > > > > >>it's
> > > > > >> >> >impossible to share such an implementation so that people can
> > > > easily
> > > > > >> >> >reuse.
> > > > > >> >> >We sort of overlooked this implication during the initial
> > > > > >>discussion of
> > > > > >> >> >the
> > > > > >> >> >producer api.
> > > > > >> >> >
> > > > > >> >> >So, I'd like to propose an api change to the new producer by
> > > > adding
> > > > > >> >>back
> > > > > >> >> >the serializer api similar to what we had in the old producer.
> > > > > >> >>Specially,
> > > > > >> >> >the proposed api changes are the following.
> > > > > >> >> >
> > > > > >> >> >First, we change KafkaProducer to take generic types K and V
> > for
> > > > the
> > > > > >> >>key
> > > > > >> >> >and the value, respectively.
> > > > > >> >> >
> > > > > >> >> >public class KafkaProducer<K,V> implements Producer<K,V> {
> > > > > >> >> >
> > > > > >> >> >    public Future<RecordMetadata> send(ProducerRecord<K,V>
> > record,
> > > > > >> >> >Callback
> > > > > >> >> >callback);
> > > > > >> >> >
> > > > > >> >> >    public Future<RecordMetadata> send(ProducerRecord<K,V>
> > > > record);
> > > > > >> >> >}
> > > > > >> >> >
> > > > > >> >> >Second, we add two new configs, one for the key serializer and
> > > > > >>another
> > > > > >> >>for
> > > > > >> >> >the value serializer. Both serializers will default to the
> > byte
> > > > > >>array
> > > > > >> >> >implementation.
> > > > > >> >> >
> > > > > >> >> >public class ProducerConfig extends AbstractConfig {
> > > > > >> >> >
> > > > > >> >> >    .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS,
> > > > > >> >> >"org.apache.kafka.clients.producer.ByteArraySerializer",
> > > > > >> >>Importance.HIGH,
> > > > > >> >> >KEY_SERIALIZER_CLASS_DOC)
> > > > > >> >> >    .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS,
> > > > > >> >> >"org.apache.kafka.clients.producer.ByteArraySerializer",
> > > > > >> >>Importance.HIGH,
> > > > > >> >> >VALUE_SERIALIZER_CLASS_DOC);
> > > > > >> >> >}
> > > > > >> >> >
> > > > > >> >> >Both serializers will implement the following interface.
> > > > > >> >> >
> > > > > >> >> >public interface Serializer<T> extends Configurable {
> > > > > >> >> >    public byte[] serialize(String topic, T data, boolean
> > isKey);
> > > > > >> >> >
> > > > > >> >> >    public void close();
> > > > > >> >> >}
> > > > > >> >> >
> > > > > >> >> >This is more or less the same as what's in the old producer.
> > The
> > > > > >>slight
> > > > > >> >> >differences are (1) the serializer now only requires a
> > > > > >>parameter-less
> > > > > >> >> >constructor; (2) the serializer has a configure() and a
> > close()
> > > > > >>method
> > > > > >> >>for
> > > > > >> >> >initialization and cleanup, respectively; (3) the serialize()
> > > > method
> > > > > >> >> >additionally takes the topic and an isKey indicator, both of
> > which
> > > > > >>are
> > > > > >> >> >useful for things like schema registration.
> > > > > >> >> >
> > > > > >> >> >The detailed changes are included in KAFKA-1797. For
> > > > completeness, I
> > > > > >> >>also
> > > > > >> >> >made the corresponding changes for the new java consumer api
> > as
> > > > > >>well.
> > > > > >> >> >
> > > > > >> >> >Note that the proposed api changes are incompatible with
> > what's in
> > > > > >>the
> > > > > >> >> >0.8.2 branch. However, if those api changes are beneficial,
> > it's
> > > > > >> >>probably
> > > > > >> >> >better to include them now in the 0.8.2 release, rather than
> > > > later.
> > > > > >> >> >
> > > > > >> >> >I'd like to discuss mainly two things in this thread.
> > > > > >> >> >1. Do people feel that the proposed api changes are
> > reasonable?
> > > > > >> >> >2. Are there any concerns of including the api changes in the
> > > > 0.8.2
> > > > > >> >>final
> > > > > >> >> >release?
> > > > > >> >> >
> > > > > >> >> >Thanks,
> > > > > >> >> >
> > > > > >> >> >Jun
> > > > > >> >>
> > > > > >> >>
> > > > > >>
> > > > > >>
> > > > >
> > > >
> > > >
> >
> >

Reply via email to