Hey Joe,

Metadata: Yes, this is how it works. You give a URL or a few URLs to
bootstrap from. From then on any metadata change will percolate up to all
producers so you should be able to dynamically change the cluster in any
way without needing to restart or reconfigure the producers. So I think you
are happy there...

For serializers and partitioners. Currently there is a default partitioner
that partitions by whatever key you supply. There is no default serializer
you MUST specify this in the configuration for the producer. The reason for
requiring the user to specify it is that it is really confusing if we
default to some default serializer because then when you forget to
configure the serializer you get this weird class cast exception from using
the wrong serializer with the wrong data. The other alternatives are what
Clark mentioned do you prefer the current approach to that?

-Jay


On Fri, Jan 24, 2014 at 4:59 PM, Joseph Lawson <jlaw...@roomkey.com> wrote:

> My 2 cents:
>
> Getting the broker metadata via active brokers is the way to go. It allows
> one to dynamically rebalance or introduce a whole new set of servers into a
> cluster just by adding them to the cluster and migrating partitions. We use
> this to periodically introduce newer Kafka cluster cloudformations into our
> running kafka cluster and retire an old Kafka broker cloudformation without
> disrupting the consumers or producers.
>
> I'd rather have the partition action done as is with a default serializer
> that takes a key and does the balancing. I can feed data to partitions such
> as user sessions using a cookie id unique to their session and know any
> partition will have the whole session if I want to practice on the stream.
> Also there is the option to include a custom serializer via passing it into
> the producer constructor so I feel like having to include my own every time
> is a step backwards. I might be misunderstanding this.
>
> Joe Lawson
>
> On Jan 24, 2014 2:54 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
> As mentioned in a previous email we are working on a re-implementation of
> the producer. I would like to use this email thread to discuss the details
> of the public API and the configuration. I would love for us to be
> incredibly picky about this public api now so it is as good as possible and
> we don't need to break it in the future.
>
> The best way to get a feel for the API is actually to take a look at the
> javadoc, my hope is to get the api docs good enough so that it is
> self-explanatory:
>
> http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/KafkaProducer.html
>
> Please take a look at this API and give me any thoughts you may have!
>
> It may also be reasonable to take a look at the configs:
>
> http://empathybox.com/kafka-javadoc/kafka/clients/producer/ProducerConfig.html
>
> The actual code is posted here:
> https://issues.apache.org/jira/browse/KAFKA-1227
>
> A few questions or comments to kick things off:
> 1. We need to make a decision on whether serialization of the user's key
> and value should be done by the user (with our api just taking byte[]) or
> if we should take an object and allow the user to configure a Serializer
> class which we instantiate via reflection. We take the later approach in
> the current producer, and I have carried this through to this prototype.
> The tradeoff I see is this: taking byte[] is actually simpler, the user can
> directly do whatever serialization they like. The complication is actually
> partitioning. Currently partitioning is done by a similar plug-in api
> (Partitioner) which the user can implement and configure to override how
> partitions are assigned. If we take byte[] as input then we have no access
> to the original object and partitioning MUST be done on the byte[]. This is
> fine for hash partitioning. However for various types of semantic
> partitioning (range partitioning, or whatever) you would want access to the
> original object. In the current approach a producer who wishes to send
> byte[] they have serialized in their own code can configure the
> BytesSerialization we supply which is just a "no op" serialization.
> 2. We should obsess over naming and make sure each of the class names are
> good.
> 3. Jun has already pointed out that we need to include the topic and
> partition in the response, which is absolutely right. I haven't done that
> yet but that definitely needs to be there.
> 4. Currently RecordSend.await will throw an exception if the request
> failed. The intention here is that producer.send(message).await() exactly
> simulates a synchronous call. Guozhang has noted that this is a little
> annoying since the user must then catch exceptions. However if we remove
> this then if the user doesn't check for errors they won't know one has
> occurred, which I predict will be a common mistake.
> 5. Perhaps there is more we could do to make the async callbacks and future
> we give back intuitive and easy to program against?
>
> Some background info on implementation:
>
> At a high level the primary difference in this producer is that it removes
> the distinction between the "sync" and "async" producer. Effectively all
> requests are sent asynchronously but always return a future response object
> that gives the offset as well as any error that may have occurred when the
> request is complete. The batching that is done in the async producer only
> today is done whenever possible now. This means that the sync producer,
> under load, can get performance as good as the async producer (preliminary
> results show the producer getting 1m messages/sec). This works similar to
> group commit in databases but with respect to the actual network
> transmission--any messages that arrive while a send is in progress are
> batched together. It is also possible to encourage batching even under low
> load to save server resources by introducing a delay on the send to allow
> more messages to accumulate; this is done using the linger.ms config (this
> is similar to Nagle's algorithm in TCP).
>
> This producer does all network communication asynchronously and in parallel
> to all servers so the performance penalty for acks=-1 and waiting on
> replication should be much reduced. I haven't done much benchmarking on
> this yet, though.
>
> The high level design is described a little here, though this is now a
> little out of date:
> https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
>
> -Jay
>

Reply via email to