That's a good point about 1A - does seem that we would need to have
some kind of TTL for each topic's metadata.

Also, WRT ZK dependency I don't think that decision (for the Java
client) affects other clients. i.e., other client implementations can
use whatever discovery mechanism it chooses. That said, I prefer not
having a ZK dependency for the same reasons covered earlier in this
thread.

On Thu, Jan 30, 2014 at 4:34 PM, Jun Rao <jun...@gmail.com> wrote:
> With option 1A, if we increase # partitions on a topic, how will the
> producer find out newly created partitions? Do we expect the producer to
> periodically call getCluster()?
>
> As for ZK dependency, one of the goals of client rewrite is to reduce
> dependencies so that one can implement the client in languages other than
> java. ZK client is only available in a small number of languages.
>
> Thanks,
>
> Jun
>
>
> On Fri, Jan 24, 2014 at 5:17 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
>
>> Clark and all,
>>
>> I thought a little bit about the serialization question. Here are the
>> options I see and the pros and cons I can think of. I'd love to hear
>> people's preferences if you have a strong one.
>>
>> One important consideration is that however the producer works will also
>> need to be how the new consumer works (which we hope to write next). That
>> is if you put objects in, you should get objects out. So we need to think
>> through both sides.
>>
>> Options:
>>
>> Option 0: What is in the existing scala code and the java code I
>> posted--Serializer and Partitioner plugin provided by the user via config.
>> Partitioner has a sane default, but Serializer needs to be specified in
>> config.
>>
>> Pros: How it works today in the scala code.
>> Cons: You have to adapt your serialization library of choice to our
>> interfaces. The reflective class loading means typo in the serializer name
>> give odd errors. Likewise there is little type safety--the ProducerRecord
>> takes Object and any type errors between the object provided and the
>> serializer give occurs at runtime.
>>
>> Option 1: No plugins
>>
>> This would mean byte[] key, byte[] value, and partitioning done by client
>> by passing in a partition *number* directly.
>>
>> The problem with this is that it is tricky to compute the partition
>> correctly and probably most people won't. We could add a getCluster()
>> method to return the Cluster instance you should use for partitioning. But
>> I suspect people would be lazy and not use that and instead hard-code
>> partitions which would break if partitions were added or they hard coded it
>> wrong. In my experience 3 partitioning strategies cover like 99% of cases
>> so not having a default implementation for this makes the common case
>> harder. Left to their own devices people will use bad hash functions and
>> get weird results.
>>
>> Option 1A: Alternatively we could partition by the key using the existing
>> default partitioning strategy which only uses the byte[] anyway but instead
>> of having a partitionKey we could have a numerical partition override and
>> add the getCluster() method to get the cluster metadata. That would make
>> custom partitioning possible but handle the common case simply.
>>
>> Option 2: Partitioner plugin remains, serializers go.
>>
>> The problem here is that the partitioner might lose access to the
>> deserialized key which would occasionally be useful for semantic
>> partitioning schemes. The Partitioner could deserialize the key but that
>> would be inefficient and weird.
>>
>> This problem could be fixed by having key and value be byte[] but retaining
>> partitionKey as an Object and passing it to the partitioner as is. Then if
>> you have a partitioner which requires the deserialized key you would need
>> to use this partition key. One weird side effect is that if you want to
>> have a custom partition key BUT want to partition by the bytes of that key
>> rather than the object value you must write a customer partitioner and
>> serialize it yourself.
>>
>> Of these I think I prefer 1A but could be convinced of 0 since that is how
>> it works now.
>>
>> Thoughts?
>>
>> -Jay
>>
>>
>> On Fri, Jan 24, 2014 at 3:30 PM, Clark Breyman <cl...@breyman.com> wrote:
>>
>> > Jay - Thanks for the call for comments. Here's some initial input:
>> >
>> > - Make message serialization a client responsibility (making all messages
>> > byte[]). Reflection-based loading makes it harder to use generic codecs
>> > (e.g.  Envelope<PREFIX, DATA, SUFFIX>) or build up codec
>> programmatically.
>> > Non-default partitioning should require an explicit partition key.
>> >
>> > - I really like the fact that it will be native Java. Please consider
>> using
>> > native maven and not sbt, gradle, ivy, etc as they don't reliably play
>> nice
>> > in the maven ecosystem. A jar without a well-formed pom doesn't feel
>> like a
>> > real artifact. The pom's generated by sbt et al. are not well formed.
>> Using
>> > maven will make builds and IDE integration much smoother.
>> >
>> > - Look at Nick Telford's dropwizard-extras package in which he defines
>> some
>> > Jackson-compatible POJO's for loading configuration. Seems like your
>> client
>> > migration is similar. The config objects should have constructors or
>> > factories that accept Map<String, String> and Properties for ease of
>> > migration.
>> >
>> > - Would you consider using the org.apache.kafka package for the new API
>> > (quibble)
>> >
>> > - Why create your own futures rather than use
>> > java.util.concurrent.Future<Long> or similar? Standard futures will play
>> > nice with other reactive libs and things like J8's ComposableFuture.
>> >
>> > Thanks again,
>> > C
>> >
>> >
>> >
>> > On Fri, Jan 24, 2014 at 2:46 PM, Roger Hoover <roger.hoo...@gmail.com
>> > >wrote:
>> >
>> > > A couple comments:
>> > >
>> > > 1) Why does the config use a broker list instead of discovering the
>> > brokers
>> > > in ZooKeeper?  It doesn't match the HighLevelConsumer API.
>> > >
>> > > 2) It looks like broker connections are created on demand.  I'm
>> wondering
>> > > if sometimes you might want to flush out config or network connectivity
>> > > issues before pushing the first message through.
>> > >
>> > > Should there also be a KafkaProducer.connect() or .open() method or
>> > > connectAll()?  I guess it would try to connect to all brokers in the
>> > > BROKER_LIST_CONFIG
>> > >
>> > > HTH,
>> > >
>> > > Roger
>> > >
>> > >
>> > > On Fri, Jan 24, 2014 at 11:54 AM, Jay Kreps <jay.kr...@gmail.com>
>> wrote:
>> > >
>> > > > As mentioned in a previous email we are working on a
>> re-implementation
>> > of
>> > > > the producer. I would like to use this email thread to discuss the
>> > > details
>> > > > of the public API and the configuration. I would love for us to be
>> > > > incredibly picky about this public api now so it is as good as
>> possible
>> > > and
>> > > > we don't need to break it in the future.
>> > > >
>> > > > The best way to get a feel for the API is actually to take a look at
>> > the
>> > > > javadoc, my hope is to get the api docs good enough so that it is
>> > > > self-explanatory:
>> > > >
>> > > >
>> > >
>> >
>> http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/KafkaProducer.html
>> > > >
>> > > > Please take a look at this API and give me any thoughts you may have!
>> > > >
>> > > > It may also be reasonable to take a look at the configs:
>> > > >
>> > > >
>> > >
>> >
>> http://empathybox.com/kafka-javadoc/kafka/clients/producer/ProducerConfig.html
>> > > >
>> > > > The actual code is posted here:
>> > > > https://issues.apache.org/jira/browse/KAFKA-1227
>> > > >
>> > > > A few questions or comments to kick things off:
>> > > > 1. We need to make a decision on whether serialization of the user's
>> > key
>> > > > and value should be done by the user (with our api just taking
>> byte[])
>> > or
>> > > > if we should take an object and allow the user to configure a
>> > Serializer
>> > > > class which we instantiate via reflection. We take the later approach
>> > in
>> > > > the current producer, and I have carried this through to this
>> > prototype.
>> > > > The tradeoff I see is this: taking byte[] is actually simpler, the
>> user
>> > > can
>> > > > directly do whatever serialization they like. The complication is
>> > > actually
>> > > > partitioning. Currently partitioning is done by a similar plug-in api
>> > > > (Partitioner) which the user can implement and configure to override
>> > how
>> > > > partitions are assigned. If we take byte[] as input then we have no
>> > > access
>> > > > to the original object and partitioning MUST be done on the byte[].
>> > This
>> > > is
>> > > > fine for hash partitioning. However for various types of semantic
>> > > > partitioning (range partitioning, or whatever) you would want access
>> to
>> > > the
>> > > > original object. In the current approach a producer who wishes to
>> send
>> > > > byte[] they have serialized in their own code can configure the
>> > > > BytesSerialization we supply which is just a "no op" serialization.
>> > > > 2. We should obsess over naming and make sure each of the class names
>> > are
>> > > > good.
>> > > > 3. Jun has already pointed out that we need to include the topic and
>> > > > partition in the response, which is absolutely right. I haven't done
>> > that
>> > > > yet but that definitely needs to be there.
>> > > > 4. Currently RecordSend.await will throw an exception if the request
>> > > > failed. The intention here is that producer.send(message).await()
>> > exactly
>> > > > simulates a synchronous call. Guozhang has noted that this is a
>> little
>> > > > annoying since the user must then catch exceptions. However if we
>> > remove
>> > > > this then if the user doesn't check for errors they won't know one
>> has
>> > > > occurred, which I predict will be a common mistake.
>> > > > 5. Perhaps there is more we could do to make the async callbacks and
>> > > future
>> > > > we give back intuitive and easy to program against?
>> > > >
>> > > > Some background info on implementation:
>> > > >
>> > > > At a high level the primary difference in this producer is that it
>> > > removes
>> > > > the distinction between the "sync" and "async" producer. Effectively
>> > all
>> > > > requests are sent asynchronously but always return a future response
>> > > object
>> > > > that gives the offset as well as any error that may have occurred
>> when
>> > > the
>> > > > request is complete. The batching that is done in the async producer
>> > only
>> > > > today is done whenever possible now. This means that the sync
>> producer,
>> > > > under load, can get performance as good as the async producer
>> > > (preliminary
>> > > > results show the producer getting 1m messages/sec). This works
>> similar
>> > to
>> > > > group commit in databases but with respect to the actual network
>> > > > transmission--any messages that arrive while a send is in progress
>> are
>> > > > batched together. It is also possible to encourage batching even
>> under
>> > > low
>> > > > load to save server resources by introducing a delay on the send to
>> > allow
>> > > > more messages to accumulate; this is done using the linger.ms config
>> > > (this
>> > > > is similar to Nagle's algorithm in TCP).
>> > > >
>> > > > This producer does all network communication asynchronously and in
>> > > parallel
>> > > > to all servers so the performance penalty for acks=-1 and waiting on
>> > > > replication should be much reduced. I haven't done much benchmarking
>> on
>> > > > this yet, though.
>> > > >
>> > > > The high level design is described a little here, though this is now
>> a
>> > > > little out of date:
>> > > > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
>> > > >
>> > > > -Jay
>> > > >
>> > >
>> >
>>

Reply via email to