These sounds like great steps. A couple of votes and questions:

1.  Moving serialization out and basing it all off of byte[] for key and
payload makes sense. Echoing a response below, we've ended up doing that in
some cases anyway, and the others do a trivial transform to bytes with an
Encoder.

2. On the single producer thread, we're actually suffering a bit from this
in 0.8, but it's mostly because compression and the blocking send happen on
this thread. In 0.7 since there was a thread-per-broker, a nice side-effect
was that compression and the blocking could "go wide", at least to the
number of brokers. If compression is moved out and the sends are now
non-blocking then this sounds like a nice improvement.

3. The wiki talks about static partition assignment for consumers. Just
adding a vote for that as we're currently working through how to do that
ourselves with the 0.8 consumer.

4. I'm curious how compression would interact with the new ByteBuffer
buffering you've described. If I'm following correctly you've said that
rather than queueing objects you'd end up doing in-place writes to the
pre-allocated ByteBuffer. Presumably this means the compression has already
happened on the user thread. But if there's no batching/buffering except in
the ByteBuffer, is there somewhere that multiple messages will be
compressed together (since it should result in better compression)? Maybe
there's still batching before this and I read too much into it?

5. I don't know if this is quite the right place to discuss it, but since
the producer has some involvement I'll throw it out there. The un-compress,
assign offsets, re-compress that happens on the broker with the built-in
compression API is a significant bottleneck that we're really trying to
avoid. As noted in another thread, we saw a throughput increase on the
order of 3x when we pre-batched and compressed the payloads before sending
it to the producer with 0.8.

I've not looked very closely at the wire-protocol, but if there was a way
for it to support in-place offset assignment even for compressed messages
it would be a huge win. Short of that we're fine taking the batch/compress
responsibility into user code, but it would be nice to have a way to do
that while retaining the built-in partition selection (i.e. semantic
partitioning) and other functionality of the producer. The new design may
already be an improvement in this area since it would move some
responsibility to the user thread.

Not sure if that's clear, but as the interfaces take shape it may be easier
to see how that will work.

-Chris






On Fri, Jul 26, 2013 at 1:00 PM, Jay Kreps <jay.kr...@gmail.com> wrote:

> I sent around a wiki a few weeks back proposing a set of client
> improvements that essentially amount to a rewrite of the producer and
> consumer java clients.
>
> https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
>
> The below discussion assumes you have read this wiki.
>
> I started to do a little prototyping for the producer and wanted to share
> some of the ideas that came up to get early feedback.
>
> First, a few simple but perhaps controversial things to discuss.
>
> Rollout
> Phase 1: We add the new clients. No change on the server. Old clients still
> exist. The new clients will be entirely in a new package so there will be
> no possibility of name collision.
> Phase 2: We swap out all shared code on the server to use the new client
> stuff. At this point the old clients still exist but are essentially
> deprecated.
> Phase 3: We remove the old client code.
>
> Java
> I think we should do the clients in java. Making our users deal with
> scala's non-compatability issues and crazy stack traces causes people a lot
> of pain. Furthermore we end up having to wrap everything now to get a
> usable java api anyway for non-scala people. This does mean maintaining a
> substantial chunk of java code, which is maybe less fun than scala. But
> basically i think we should optimize for the end user and produce a
> standalone pure-java jar with no dependencies.
>
> Jars
> We definitely want to separate out the client jar. There is also a fair
> amount of code shared between both (exceptions, protocol definition, utils,
> and the message set implementation). Two approaches.
> Two jar approach: split kafka.jar into kafka-clients.jar and
> kafka-server.jar with the server depending on the clients. The advantage of
> this is that it is simple. The disadvantage is that things like utils and
> protocol definition will be in the client jar though technical they belong
> equally to the server.
> Many jar approach: split kafka.jar into kafka-common.jar,
> kafka-producer.jar, kafka-consumer.jar, kafka-admin.jar, and
> kafka-server.jar. The disadvantage of this is that the user needs two jars
> (common + something) which is for sure going to confuse people. I also
> think this will tend to spawn more jars over time.
>
> Background threads
> I am thinking of moving both serialization and compression out of the
> background send thread. I will explain a little about this idea below.
>
> Serialization
> I am not sure if we should handle serialization in the client at all.
> Basically I wonder if our own API wouldn't just be a lot simpler if we took
> a byte[] key and byte[] value and let people serialize stuff themselves.
> Injecting a class name for us to create the serializer is more roundabout
> and has a lot of problems if the serializer itself requires a lot of
> configuration or other objects to be instantiated.
>
> Partitioning
> The real question with serialization is whether the partitioning should
> happen on the java object or on the byte array key. The argument for doing
> it on the java object is that it is easier to do something like a range
> partition on the object. The problem with doing it on the object is that
> the consumer may not be in java and so may not be able to reproduce the
> partitioning. For example we currently use Object.hashCode which is a
> little sketchy. We would be better off doing a standardized hash function
> on the key bytes. If we want to give the partitioner access to the original
> java object then obviously we need to handle serialization behind our api.
>
> Names
> I think good names are important. I would like to rename the following
> classes in the new client:
>   Message=>Record: Now that the message has both a message and a key it is
> more of a KeyedMessage. Another name for a KeyedMessage is a Record.
>   MessageSet=>Records: This isn't too important but nit pickers complain
> that it is not technically a Set but rather a List or Sequence but
> MessageList sounds funny to me.
>
> The actual clients will not interact with these classes. They will interact
> with a ProducerRecord and ConsumerRecord. The reason for having different
> fields is because the different clients
> Proposed producer API:
> SendResponse r = producer.send(new ProducerRecord(topic, key, value))
>
> Protocol Definition
>
> Here is what I am thinking about protocol definition. I see a couple of
> problems with what we are doing currently. First the protocol definition is
> spread throughout a bunch of custom java objects. The error reporting in
> these object is really terrible because they don't record the field names.
> Furthermore people keep adding business logic into the protocol objects
> which is pretty nasty.
>
> I would like to move to having a single Protocol.java file that defines the
> protocol in a readable DSL. Here is what I am thinking:
>
>   public static Schema REQUEST_HEADER =
>
>     new Schema(new Field("api_key", INT16, "The id of the request type."),
>
>                new Field("api_version", INT16, "The version of the API."),
>
>                  new Field("correlation_id", INT32, "A user-supplied
> integer value that will be passed back with the response"),
>
>                  new Field("client_id", STRING, "A user specified
> identifier for the client making the request."));
>
> To parse one of these requests you would do
>    Struct struct = REQUEST_HEADER.parse(bytebuffer);
>    short apiKey = struct.get("api_key");
>
> Internally Struct is just an Object[] with one entry per field which is
> populated from the schema. The mapping of name to array index is a hash
> table lookup. We can optimize access for performance critical areas by
> allowing:
>    static Field apiKeyField = REQUEST_HEADER.getField("api_key"); // do
> this once to lookup the index of the field
>    ...
>    Struct struct = REQUEST_HEADER.parse(bytebuffer);
>    short apiKey = struct.get(apiKeyField); // now this is just an array
> access
>
> One advantage of this is this level of indirection will make it really easy
> for us to handle backwards compatability in a more principled way. The
> protocol file will actually contain ALL versions of the schema and we will
> always use the appropriate version to read the request (as specified in the
> header).
>
> NIO layer
>
> The plan is to add a non-blocking multi-connection abstraction that would
> be used by both clients.
>
> class Selector {
>   /* create a new connection and associate it with the given id */
>   public void connect(int id, InetSocketAddress address, intsendBufferSize,
> int receiveBufferSize)
>   /* wakeup this selector if it is currently awaiting data */
>   public void wakeup()
>   /* user provides sends, recieves, and a timeout. this method will
> populate "completed" and "disconnects" lists. Method blocks for up to the
> timeout waiting for data to read. */
>   public void poll(long timeout, List<Send> sends, List<Send> completed,
> List<Receive> receives, List<Integer> disconnects)
> }
>
> The consumer and producer would then each define their own logic to manage
> their set of in-flight requests.
>
> Producer Implementation
>
> There are a couple of interesting changes I think we can make to the
> producer implementation.
>
> We retain the single background "sender" thread.
>
> But we can remove the definition of sync vs async clients. We always return
> a "future" response immediately. Both sync and async sends would go through
> the buffering that we currently do for the async layer. This would mean
> that even in sync mode while the event loop is doing network IO if many
> requests accumulate they will be sent in a single batch. This effectively
> acts as a kind of "group commit". So instead of having an "async" mode that
> acts differently in some way you just have a max.delay time that controls
> how long the client will linger waiting for more data to accumulate.
> max.delay=0 is equivalent to the current sync producer.
>
> I would also propose changing our buffering strategy. Currently we queue
> unserialized requests in a BlockingQueue. This is not ideal as it is very
> difficult to reason about the memory usage of this queue. One 5MB message
> may be bigger than 10k small messages. I propose that (1) we change our
> queuing strategy to queue per-partition and (2) we directly write the
> messages to the ByteBuffer which will eventually be sent and use that as
> the "queue". The batch size should likewise be in bytes not in number of
> messages.
>
> If you think about it our current queuing strategy doesn't really make
> sense any more now that we always load balance over brokers. You set a
> batch size of N and we do a request when we have N messages in queue but
> this says nothing about the size of the requests that will be sent. You
> might end up sending all N messages to one server or you might end up
> sending 1 message to N different servers (totally defeating the purpose of
> batching).
>
> There are two granularities of batching that could make sense: the broker
> level or the partition level. We do the send requests at the broker level
> but we do the disk IO at the partition level. I propose making the queues
> per-partition rather than per broker to avoid having to reshuffle the
> contents of queues when leadership changes. This could be debated, though.
>
> If you actually look at the byte path of the producer this approach allows
> cleaning a ton of stuff up. We can do in-pace writes to the destination
> buffer that we will eventually send. This does mean moving serialization
> and compression to the user thread. But I think this is good as these may
> be slow but aren't unpredictably slow.
>
> The per-partition queues are thus implemented with a bunch of pre-allocated
> ByteBuffers sized to max.batch.size, when the buffer is full or the delay
> time elapses that buffer is sent.
>
> By doing this we could actually just reuse the buffers when the send is
> complete. This would be nice because since the buffers are used for
> allocation they will likely fall out of young gen.
>
> A question to think about is how we want to bound memory usage. I think
> what we want is the max.batch.size which controls the size of the
> individual buffers and total.buffer.memory which controls the total memory
> used by all buffers. One problem with this is that there is the possibility
> of some fragmentation. I.e. image a situation with 5k partitions being
> produced to, each getting a low but steady message rate. Giving each of
> these a 1MB buffer would require 5GB of buffer space to have a buffer for
> each partition. I'm not sure how bad this is since at least the memory
> usage is predictable and the reality is that holding thousands of java
> objects has huge overhead compared to contiguous byte arrays.
>
> -Jay
>

Reply via email to