Cool. With respect to compression performance, we definitely see the same
thing, no debate.

Of course if you want to just compress the message payloads you can do that
now without needing much help from kafka--just pass in the compressed data.
Whether it not it will do much depends on the size of the message body--for
small messages you basically need batch compression, but for large messages
just compressing the body is fine. Our extra effort was to get the better
compression ratio of compressed messages.

What I was saying about snappy performance is that I think it may be our
our inefficiency in the compression code-path rather than the underlying
slowness of snappy. For example on this page
  https://github.com/dain/snappy
The compression performance they list for jni (the library we use) tends to
be around 200MB per core-second, with decompression around 1GB per
core-second. So on a modern machine with umpteen cores that should not be a
bottleneck, right? I don't know this to be true but I am wondering if the
the underlying bottleneck is the compression algorithm or our inefficient
code. If you look at kafka.message.ByteBufferMessageSet.{create,
decompress, and assignOffsets} it is pretty inefficient. I did a round of
improvement there but we are still recopying stuff over and over and
creating zillions of little buffers and objects. It is a little tricky to
clean up but probably just a 1-2 day project.

I would rather figure out that it is really the compression that is the
root cause rather than just our inefficiency before we do anything too
drastic design wise. If this is really killing you guys, and if that turns
out to be the cause, we would definitely take a patch to optimize that path
now.

-Jay




On Fri, Aug 2, 2013 at 4:55 PM, Chris Hogue <csho...@gmail.com> wrote:

> Thanks for the responses. Additional follow-up inline.
>
>
> On Fri, Aug 2, 2013 at 2:21 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
>
> > Great comments, answers inline!
> >
> > On Fri, Aug 2, 2013 at 12:28 PM, Chris Hogue <csho...@gmail.com> wrote:
> >
> > > These sounds like great steps. A couple of votes and questions:
> > >
> > > 1.  Moving serialization out and basing it all off of byte[] for key
> and
> > > payload makes sense. Echoing a response below, we've ended up doing
> that
> > in
> > > some cases anyway, and the others do a trivial transform to bytes with
> an
> > > Encoder.
> > >
> >
> > Cool.
> >
> >
> > > 2. On the single producer thread, we're actually suffering a bit from
> > this
> > > in 0.8, but it's mostly because compression and the blocking send
> happen
> > on
> > > this thread. In 0.7 since there was a thread-per-broker, a nice
> > side-effect
> > > was that compression and the blocking could "go wide", at least to the
> > > number of brokers. If compression is moved out and the sends are now
> > > non-blocking then this sounds like a nice improvement.
> > >
> >
> > I think even in 0.7 there was only one thread, right?
> >
> >
> I believe it was actually 1 per broker. Producer.scala iterates the brokers
> and adds a new producer for each. The ProducerPool.addProducer() method
> adds a new AsyncProducer instance for the broker (assuming async mode), and
> each AsyncProducer creates and starts its own ProducerSendThread.
>
> In either case, going to multiplexed I/O and not having the compression on
> this thread probably solves any issue there.
>
>
>
> >
> > > 3. The wiki talks about static partition assignment for consumers. Just
> > > adding a vote for that as we're currently working through how to do
> that
> > > ourselves with the 0.8 consumer.
> > >
> >
> > Cool, yeah currently you must use the simple consumer to get that which
> is
> > a pain.
> >
> >
> > > 4. I'm curious how compression would interact with the new ByteBuffer
> > > buffering you've described. If I'm following correctly you've said that
> > > rather than queueing objects you'd end up doing in-place writes to the
> > > pre-allocated ByteBuffer. Presumably this means the compression has
> > already
> > > happened on the user thread. But if there's no batching/buffering
> except
> > in
> > > the ByteBuffer, is there somewhere that multiple messages will be
> > > compressed together (since it should result in better compression)?
> Maybe
> > > there's still batching before this and I read too much into it?
> > >
> >
> > I'm not 100% sure, but I believe the compression can still be done
> inline.
> > The compression algorithm will buffer a bit, of course. What we currently
> > do though is write out the full data uncompressed and then compress it.
> > This is pretty inefficient. Basically we are using Java's OutputStream
> apis
> > for compression but we need to be using the lower-level array oriented
> > algorithms like (Deflater). I haven't tried this but my assumption is
> that
> > we can compress the messages as they arrive into the destination buffer
> > instead of the current approach.
> >
>
>
> Right, was starting to think you may be looking at a way of doing the
> compression incrementally as they come in. Sounds like what you're
> pursuing.
>
>
>
> >
> >
> > > 5. I don't know if this is quite the right place to discuss it, but
> since
> > > the producer has some involvement I'll throw it out there. The
> > un-compress,
> > > assign offsets, re-compress that happens on the broker with the
> built-in
> > > compression API is a significant bottleneck that we're really trying to
> > > avoid. As noted in another thread, we saw a throughput increase on the
> > > order of 3x when we pre-batched and compressed the payloads before
> > sending
> > > it to the producer with 0.8.
> > >
> >
> > Yes, it is a bummer. We think ultimately this does make sense though, for
> > two reasons beyond offsets:
> > 1. You have to validate the integrity of the data the client has sent to
> > you or else one bad or buggy client can screw up all consumers.
> > 2. The compression of the log should not be tied to the compression used
> by
> > individual producers. We haven't made this change yet, but it is an easy
> > one. The problem today is that if your producers send a variety of
> > compression types your consumers need to handle the union of all types
> and
> > you have no guarantee over what types producers may send in the future.
> > Instead we think these should be decoupled. The topic should have a
> > compression type property and that should be totally decoupled from the
> > compression type the producer uses. In many cases there is no real need
> for
> > the producer to use compression at all as the real thing you want to
> > optimize is later inter-datacenter transfers no the network send to the
> > local broker so the producer can just send uncompressed and have the
> broker
> > control the compression type.
> >
> > The performance really has two causes though:
> > 1. GZIP is super slow, especially java's implementation. But snappy, for
> > example, is actually quite fast. We should be able to do snappy at
> network
> > speeds according to the perf data I have seen, but...
> > 2. ...our current compression code is kind of inefficient due to all the
> > copying and traversal, due to the reasons cited above.
> >
> > So in other words I think we can make this a bit better but it probably
> > won't go away. How do you feel about snappy?
> >
> >
> Sorry, I should have been more clear--these tests were all with Snappy (the
> same library Kafka uses, just called directly from our code before it went
> to the producer). I did an early GZIP test and it was just too far out of
> the ballpark to be useful.
>
> I completely understand the architectural separation and the value you're
> describing here, especially in a general solution where you may have many
> heterogenous producer and consumer types. In our case it will be pretty
> homogeneous and throughput is a primary concern, hence the focus on this.
>
> I started putting in a description of the benchmarks we did but it's going
> to blow up this thread, so it's probably best if that goes in its own
> separate thread. The summary is that at an application level, this change
> alone is the difference between being able to send 25,000 (2.5KB)
> messages/sec to a single broker vs over 80,000/sec. For comparison, I did a
> small test that simply wrote messages to the kafka logs via the Log class
> (in a standalone app on that machine, not through a server) and saw around
> ~170,000 messages/sec. The throughput to the disk as reported by iostat
> reflected a similar change.
>
> Obviously without more detail you'll have to take those numbers as a rough
> sketch, and I'm happy to give more detail separately, but that's a high
> enough cost on the broker that we really think we need to avoid it.
>
>
>
> > > I've not looked very closely at the wire-protocol, but if there was a
> way
> > > for it to support in-place offset assignment even for compressed
> messages
> > > it would be a huge win. Short of that we're fine taking the
> > batch/compress
> > > responsibility into user code, but it would be nice to have a way to do
> > > that while retaining the built-in partition selection (i.e. semantic
> > > partitioning) and other functionality of the producer. The new design
> may
> > > already be an improvement in this area since it would move some
> > > responsibility to the user thread.
> > >
> >
> > We can't really do this because we are multi-writer so any offset we give
> > the client would potentially be used by another producer and then be
> > invalid or non-sequential.
> >
>
> I may have said this in a confusing way. With the tests we did it was still
> the broker assigning offsets, it's just that the message as a whole wasn't
> compressed, only the payloads. So the broker still had plain-bytes access
> to the headers and went through the optimized code path that exists for
> non-compressed messages.
>
>
> Really appreciate your responses and glad to see this making progress.
>
>
>
> >
> > >
> > > Not sure if that's clear, but as the interfaces take shape it may be
> > easier
> > > to see how that will work.
> > >
> > > -Chris
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Fri, Jul 26, 2013 at 1:00 PM, Jay Kreps <jay.kr...@gmail.com>
> wrote:
> > >
> > > > I sent around a wiki a few weeks back proposing a set of client
> > > > improvements that essentially amount to a rewrite of the producer and
> > > > consumer java clients.
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
> > > >
> > > > The below discussion assumes you have read this wiki.
> > > >
> > > > I started to do a little prototyping for the producer and wanted to
> > share
> > > > some of the ideas that came up to get early feedback.
> > > >
> > > > First, a few simple but perhaps controversial things to discuss.
> > > >
> > > > Rollout
> > > > Phase 1: We add the new clients. No change on the server. Old clients
> > > still
> > > > exist. The new clients will be entirely in a new package so there
> will
> > be
> > > > no possibility of name collision.
> > > > Phase 2: We swap out all shared code on the server to use the new
> > client
> > > > stuff. At this point the old clients still exist but are essentially
> > > > deprecated.
> > > > Phase 3: We remove the old client code.
> > > >
> > > > Java
> > > > I think we should do the clients in java. Making our users deal with
> > > > scala's non-compatability issues and crazy stack traces causes
> people a
> > > lot
> > > > of pain. Furthermore we end up having to wrap everything now to get a
> > > > usable java api anyway for non-scala people. This does mean
> > maintaining a
> > > > substantial chunk of java code, which is maybe less fun than scala.
> But
> > > > basically i think we should optimize for the end user and produce a
> > > > standalone pure-java jar with no dependencies.
> > > >
> > > > Jars
> > > > We definitely want to separate out the client jar. There is also a
> fair
> > > > amount of code shared between both (exceptions, protocol definition,
> > > utils,
> > > > and the message set implementation). Two approaches.
> > > > Two jar approach: split kafka.jar into kafka-clients.jar and
> > > > kafka-server.jar with the server depending on the clients. The
> > advantage
> > > of
> > > > this is that it is simple. The disadvantage is that things like utils
> > and
> > > > protocol definition will be in the client jar though technical they
> > > belong
> > > > equally to the server.
> > > > Many jar approach: split kafka.jar into kafka-common.jar,
> > > > kafka-producer.jar, kafka-consumer.jar, kafka-admin.jar, and
> > > > kafka-server.jar. The disadvantage of this is that the user needs two
> > > jars
> > > > (common + something) which is for sure going to confuse people. I
> also
> > > > think this will tend to spawn more jars over time.
> > > >
> > > > Background threads
> > > > I am thinking of moving both serialization and compression out of the
> > > > background send thread. I will explain a little about this idea
> below.
> > > >
> > > > Serialization
> > > > I am not sure if we should handle serialization in the client at all.
> > > > Basically I wonder if our own API wouldn't just be a lot simpler if
> we
> > > took
> > > > a byte[] key and byte[] value and let people serialize stuff
> > themselves.
> > > > Injecting a class name for us to create the serializer is more
> > roundabout
> > > > and has a lot of problems if the serializer itself requires a lot of
> > > > configuration or other objects to be instantiated.
> > > >
> > > > Partitioning
> > > > The real question with serialization is whether the partitioning
> should
> > > > happen on the java object or on the byte array key. The argument for
> > > doing
> > > > it on the java object is that it is easier to do something like a
> range
> > > > partition on the object. The problem with doing it on the object is
> > that
> > > > the consumer may not be in java and so may not be able to reproduce
> the
> > > > partitioning. For example we currently use Object.hashCode which is a
> > > > little sketchy. We would be better off doing a standardized hash
> > function
> > > > on the key bytes. If we want to give the partitioner access to the
> > > original
> > > > java object then obviously we need to handle serialization behind our
> > > api.
> > > >
> > > > Names
> > > > I think good names are important. I would like to rename the
> following
> > > > classes in the new client:
> > > >   Message=>Record: Now that the message has both a message and a key
> it
> > > is
> > > > more of a KeyedMessage. Another name for a KeyedMessage is a Record.
> > > >   MessageSet=>Records: This isn't too important but nit pickers
> > complain
> > > > that it is not technically a Set but rather a List or Sequence but
> > > > MessageList sounds funny to me.
> > > >
> > > > The actual clients will not interact with these classes. They will
> > > interact
> > > > with a ProducerRecord and ConsumerRecord. The reason for having
> > different
> > > > fields is because the different clients
> > > > Proposed producer API:
> > > > SendResponse r = producer.send(new ProducerRecord(topic, key, value))
> > > >
> > > > Protocol Definition
> > > >
> > > > Here is what I am thinking about protocol definition. I see a couple
> of
> > > > problems with what we are doing currently. First the protocol
> > definition
> > > is
> > > > spread throughout a bunch of custom java objects. The error reporting
> > in
> > > > these object is really terrible because they don't record the field
> > > names.
> > > > Furthermore people keep adding business logic into the protocol
> objects
> > > > which is pretty nasty.
> > > >
> > > > I would like to move to having a single Protocol.java file that
> defines
> > > the
> > > > protocol in a readable DSL. Here is what I am thinking:
> > > >
> > > >   public static Schema REQUEST_HEADER =
> > > >
> > > >     new Schema(new Field("api_key", INT16, "The id of the request
> > > type."),
> > > >
> > > >                new Field("api_version", INT16, "The version of the
> > > API."),
> > > >
> > > >                  new Field("correlation_id", INT32, "A user-supplied
> > > > integer value that will be passed back with the response"),
> > > >
> > > >                  new Field("client_id", STRING, "A user specified
> > > > identifier for the client making the request."));
> > > >
> > > > To parse one of these requests you would do
> > > >    Struct struct = REQUEST_HEADER.parse(bytebuffer);
> > > >    short apiKey = struct.get("api_key");
> > > >
> > > > Internally Struct is just an Object[] with one entry per field which
> is
> > > > populated from the schema. The mapping of name to array index is a
> hash
> > > > table lookup. We can optimize access for performance critical areas
> by
> > > > allowing:
> > > >    static Field apiKeyField = REQUEST_HEADER.getField("api_key"); //
> do
> > > > this once to lookup the index of the field
> > > >    ...
> > > >    Struct struct = REQUEST_HEADER.parse(bytebuffer);
> > > >    short apiKey = struct.get(apiKeyField); // now this is just an
> array
> > > > access
> > > >
> > > > One advantage of this is this level of indirection will make it
> really
> > > easy
> > > > for us to handle backwards compatability in a more principled way.
> The
> > > > protocol file will actually contain ALL versions of the schema and we
> > > will
> > > > always use the appropriate version to read the request (as specified
> in
> > > the
> > > > header).
> > > >
> > > > NIO layer
> > > >
> > > > The plan is to add a non-blocking multi-connection abstraction that
> > would
> > > > be used by both clients.
> > > >
> > > > class Selector {
> > > >   /* create a new connection and associate it with the given id */
> > > >   public void connect(int id, InetSocketAddress address,
> > > intsendBufferSize,
> > > > int receiveBufferSize)
> > > >   /* wakeup this selector if it is currently awaiting data */
> > > >   public void wakeup()
> > > >   /* user provides sends, recieves, and a timeout. this method will
> > > > populate "completed" and "disconnects" lists. Method blocks for up to
> > the
> > > > timeout waiting for data to read. */
> > > >   public void poll(long timeout, List<Send> sends, List<Send>
> > completed,
> > > > List<Receive> receives, List<Integer> disconnects)
> > > > }
> > > >
> > > > The consumer and producer would then each define their own logic to
> > > manage
> > > > their set of in-flight requests.
> > > >
> > > > Producer Implementation
> > > >
> > > > There are a couple of interesting changes I think we can make to the
> > > > producer implementation.
> > > >
> > > > We retain the single background "sender" thread.
> > > >
> > > > But we can remove the definition of sync vs async clients. We always
> > > return
> > > > a "future" response immediately. Both sync and async sends would go
> > > through
> > > > the buffering that we currently do for the async layer. This would
> mean
> > > > that even in sync mode while the event loop is doing network IO if
> many
> > > > requests accumulate they will be sent in a single batch. This
> > effectively
> > > > acts as a kind of "group commit". So instead of having an "async"
> mode
> > > that
> > > > acts differently in some way you just have a max.delay time that
> > controls
> > > > how long the client will linger waiting for more data to accumulate.
> > > > max.delay=0 is equivalent to the current sync producer.
> > > >
> > > > I would also propose changing our buffering strategy. Currently we
> > queue
> > > > unserialized requests in a BlockingQueue. This is not ideal as it is
> > very
> > > > difficult to reason about the memory usage of this queue. One 5MB
> > message
> > > > may be bigger than 10k small messages. I propose that (1) we change
> our
> > > > queuing strategy to queue per-partition and (2) we directly write the
> > > > messages to the ByteBuffer which will eventually be sent and use that
> > as
> > > > the "queue". The batch size should likewise be in bytes not in number
> > of
> > > > messages.
> > > >
> > > > If you think about it our current queuing strategy doesn't really
> make
> > > > sense any more now that we always load balance over brokers. You set
> a
> > > > batch size of N and we do a request when we have N messages in queue
> > but
> > > > this says nothing about the size of the requests that will be sent.
> You
> > > > might end up sending all N messages to one server or you might end up
> > > > sending 1 message to N different servers (totally defeating the
> purpose
> > > of
> > > > batching).
> > > >
> > > > There are two granularities of batching that could make sense: the
> > broker
> > > > level or the partition level. We do the send requests at the broker
> > level
> > > > but we do the disk IO at the partition level. I propose making the
> > queues
> > > > per-partition rather than per broker to avoid having to reshuffle the
> > > > contents of queues when leadership changes. This could be debated,
> > > though.
> > > >
> > > > If you actually look at the byte path of the producer this approach
> > > allows
> > > > cleaning a ton of stuff up. We can do in-pace writes to the
> destination
> > > > buffer that we will eventually send. This does mean moving
> > serialization
> > > > and compression to the user thread. But I think this is good as these
> > may
> > > > be slow but aren't unpredictably slow.
> > > >
> > > > The per-partition queues are thus implemented with a bunch of
> > > pre-allocated
> > > > ByteBuffers sized to max.batch.size, when the buffer is full or the
> > delay
> > > > time elapses that buffer is sent.
> > > >
> > > > By doing this we could actually just reuse the buffers when the send
> is
> > > > complete. This would be nice because since the buffers are used for
> > > > allocation they will likely fall out of young gen.
> > > >
> > > > A question to think about is how we want to bound memory usage. I
> think
> > > > what we want is the max.batch.size which controls the size of the
> > > > individual buffers and total.buffer.memory which controls the total
> > > memory
> > > > used by all buffers. One problem with this is that there is the
> > > possibility
> > > > of some fragmentation. I.e. image a situation with 5k partitions
> being
> > > > produced to, each getting a low but steady message rate. Giving each
> of
> > > > these a 1MB buffer would require 5GB of buffer space to have a buffer
> > for
> > > > each partition. I'm not sure how bad this is since at least the
> memory
> > > > usage is predictable and the reality is that holding thousands of
> java
> > > > objects has huge overhead compared to contiguous byte arrays.
> > > >
> > > > -Jay
> > > >
> > >
> >
>

Reply via email to