Oliver, We have a design for replication (see the design doc and subtasks at https://issues.apache.org/jira/browse/KAFKA-50). We are currently wrapping up the compression support and will start working on replication soon.
Jun On Tue, Jul 19, 2011 at 12:59 PM, Olivier Pomel <[email protected]> wrote: > Thanks, guys, this was a great thread. May be worth pointing to it in the > online docs as it asks and answers a lot of interesting questions about the > performance characteristics and tradeoffs made in Kafka. > > How far out do you think built-in replication is? > Best, > O. > > > > On Tue, Jul 19, 2011 at 3:23 PM, Jay Kreps <[email protected]> wrote: > > > Agreed, no reason the policy to hand out messages should not be > > configurable. We were hoping to make the whole question irrelevant with > the > > replication since then the producer can choose the replication level it > > wants and fsync durability should be less of a concern. > > > > I agree with your comment that a good implementation of streaming with > acks > > being potentially superior. > > > > -Jay > > > > On Tue, Jul 19, 2011 at 11:15 AM, Paul Sutter <[email protected] > > >wrote: > > > > > Jay, > > > > > > Ah - thanks for the clarification on the delay in the broker. It would > be > > > nice to if that were a configuration option, so that the end user can > > > choose > > > only to forward messages that have been written to disk, or choose to > > have > > > the data forwarded immediately. When you implement replication data > > hitting > > > the disk will matter less. > > > > > > On the delay in the producer, I think it could best be resolved through > > > measurement. In your paper you compare two different approaches, and > I'm > > > proposing a third: > > > > > > 1. Send and wait (single message, JMS style) > > > 2. Batch, send, and wait (Kafka today) > > > 3. Stream with ACKs > > > > > > Removing any wait for a reply should increase throughput, not decrease > > it, > > > so you're likely trading latency against potential CPU efficiency. And > > the > > > CPU savings is a question best resolved by measurement. > > > > > > I'd also encourage you to think about the WAN case. When you > > send-and-wait, > > > you need to send a buffer that is >> the bandwidth delay product to > > > approach > > > full line utilization, and the line will go idle for one RTT while you > > stop > > > to wait for a reply. The bandwidth*delay product can get large (10s of > > > megabytes), and end users will rarely understand the need to tune the > > batch > > > size to increase throughput. They'll just say it's slow over long > > > distances. > > > > > > All that said - your use case doesn't require minimizing latency or WAN > > > use, > > > so I can really understand if this isn't a priority for you. > > > > > > It's a well designed product that has had some real thought put into > it. > > > It's a really promising system, thanks for taking the time to respond > to > > my > > > comments. > > > > > > Paul > > > > > > On Tue, Jul 19, 2011 at 10:44 AM, Jay Kreps <[email protected]> > wrote: > > > > > > > Ah, I think what you are describing in zeromq is essentially the > > > equivalent > > > > of group commit for the socket. Essentially you wait until the socket > > is > > > no > > > > longer writable and then begin to queue data. This is an interesting > > > idea. > > > > Of course it would only have a positive effect when you had already > > > > overflowed the socket buffer and were sending a very high throughput > of > > > > small messages. That basically is a way to degrade an overloaded > > > > synchronous > > > > send into a batched send. This is not really the same as what we have > > > done, > > > > which is to allow the ability to trade off latency for throughput in > a > > > > configurable manner. The reason the later is important is that we do > > not > > > > have a handful of producers sending at a rate that saturates the > > network > > > > I/O > > > > capacity of those servers (the case where the group commit would > help) > > > but > > > > rather we have thousands of producers sending at a medium low volume, > > so > > > we > > > > would never hit that in our use case. The advantage of batching is > > fewer > > > > requests that hit the server, and larger packets. Where the group > > commit > > > > would help is for the synchronous producer benchmarks, where you > could > > > > potentially get much better throughput. This is something we should > > > > consider > > > > adding. > > > > > > > > To be clear, though, we have not added latency in our layer, just > made > > a > > > > configurable way to trade-off latency for throughput. This is > > > unambiguously > > > > a good thing, I think. > > > > > > > > With respect to mmap, i think you are misunderstanding where the > > latency > > > > comes from. We immediately write data to the filesystem with no delay > > > > whatsoever. This incurs the overhead of a system call, as you point > > out, > > > > which could be avoided by mmap, but that doesn't add much in the way > of > > > > latency. The latency comes from the fact that we do not make the > > written > > > > data available to consumers until we fsync the file to "ensure" the > > > > durability of consumed messages. The frequency of the fsync is > > > > configurable, > > > > anything either immediate or with a time or # messages threshold. > This > > > > again > > > > trades latency for throughput. > > > > > > > > -Jay > > > > > > > > On Mon, Jul 18, 2011 at 10:26 PM, Paul Sutter < > [email protected] > > > > >wrote: > > > > > > > > > *Producer latency* - I'm not familiar with zeromq internals but my > > > > > understanding is that they send the first message(s) immediately > and > > as > > > > TCP > > > > > queues up the data, it will eventually block as the send buffer > > fills, > > > > and > > > > > during this time messages can queue up, and thte net-net is that on > > > > average > > > > > the number of system calls is << the number of messages. The key is > > > > having > > > > > a > > > > > separate thread for network operations with very efficient thread > > > > > coordination. Clearly Nagle is disabled (TCP_NODELAY) as Nagle is > a > > > > blight > > > > > against humanity. > > > > > > > > > > Having any sort of delay adds latency. If every developer thinks > its > > OK > > > > to > > > > > add a little latency in his layer, pretty soon you end up with 10 > > > second > > > > > end > > > > > to end latency. > > > > > > > > > > Having an "accumulated message count" is also bad for WAN > > performance. > > > If > > > > > your "window size" is a set of delayed messages, the only way to > deal > > > > with > > > > > a > > > > > large bandwidth*delay product is to delay a lot of messages, then > > send > > > > > them. > > > > > You can fit a lot of data into a fiber. Imagine a gigabit link with > > > 100ms > > > > > roundtrip time, you can store 100MB in the fiber. And you need a > > > > multiples > > > > > of that for buffering if you need to do a retransmit. > > > > > > > > > > *Broker Latency *- With mmap the memcpy() of the message should > make > > > the > > > > > data available to a thread even in another process, the pages that > > you > > > > have > > > > > mapped are also in the buffer cache and available to a sendfile() > > call. > > > > or > > > > > at least I think so. The flush to physical disk (or msync() in this > > > case) > > > > > would still be delayed but without impacting end to end latency. > > > > > > > > > > That said, in benchmarks I have done the fastest IO with the lowest > > CPU > > > > > overhead is unbuffered (direct) IO (which is lower overhead than > > using > > > > the > > > > > buffer cache with or without memory mapping), but then you'd have > to > > > > manage > > > > > your own buffer pool and run your broker in a single multithreaded > > > > process. > > > > > But thats getting more extreme. Just getting rid of this buffer > write > > > > delay > > > > > by using memory mapping will remove a big chunk of latency. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Jul 18, 2011 at 9:30 PM, Jay Kreps <[email protected]> > > > wrote: > > > > > > > > > > > Hi Paul, > > > > > > > > > > > > We are definitely interested in lowering latency--lower is always > > > > > > better--but that was not a major concern for us so far (we were > > > > replacing > > > > > a > > > > > > system with 1 hour latency), so we haven't focused on it yet. As > > you > > > > > > describe latency in our setup at linkedin comes from batching on > > the > > > > > > frontend and batching on the kafka servers do to very lenient > flush > > > > > > settings. > > > > > > > > > > > > I am interested in your comments on zeromq. Do they actually have > a > > > > > better > > > > > > approach for this problem even when using TCP? If so I would be > > > > > interested > > > > > > to understand. The way I see things this is about trading > > throughput > > > > and > > > > > > latency. On the producer side you have only a few options: > > > immediately > > > > > > write > > > > > > the data to the socket buffer for sending or wait and see if the > > > > > > application > > > > > > writes more data. The OS will do this for you unless you set > > > > TCP_NODELAY, > > > > > > but the OS is relatively inflexible, it doesn't understand your > > data > > > so > > > > I > > > > > > think it just waits 200ms or until the socket buffer is full. > > > > > > > > > > > > The current approach in the async producer captures the same > > > tradeoff, > > > > > but > > > > > > a > > > > > > little more flexibly, it allows you to specify a max delay and > max > > > > > > accumulated message count, data is written when either of those > is > > > hit. > > > > > > > > > > > > Is it possible to better capture this tradeoff? Basically I am > not > > > > aware > > > > > of > > > > > > any other trick here if you are using TCP, so i would be > interested > > > in > > > > > what > > > > > > zeromq does if they are doing this better. > > > > > > > > > > > > We do indeed write each message set to the filesystem as it > arrives > > > but > > > > > we > > > > > > distribute messages to consumers only after the write has been > > > flushed > > > > to > > > > > > disk, delaying (batching) that flush is the cause of the latency > > but > > > > also > > > > > > gives better use of IOPs by generating larger writes. Mmap would > > > remove > > > > > the > > > > > > system call (which would be good), but not the flush I think. As > > you > > > > say, > > > > > > adding replication allows giving stronger guarantees without > > actually > > > > > > caring > > > > > > about durability on a particular server which would make it > > possible > > > to > > > > > > distribute messages to consumers after ack from some number of > > other > > > > > > servers > > > > > > irrespective of flushing to disk. > > > > > > > > > > > > -Jay > > > > > > > > > > > > On Mon, Jul 18, 2011 at 8:27 PM, Paul Sutter < > > [email protected] > > > > > > > > > > wrote: > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > Thanks for your answers and the link to the paper - that helps > a > > > lot, > > > > > > > especially the comment in the paper that 10 second end to end > > > latency > > > > > is > > > > > > > good enough for your intended use case. > > > > > > > > > > > > > > We're looking for much lower latencies, and the basic design of > > > Kafka > > > > > > feels > > > > > > > like it should support latencies in milliseconds with a few > > > changes. > > > > > > We're > > > > > > > either going to build our own system, or help develop something > > > that > > > > > > > already > > > > > > > exists, so please take my comments in the constructive way > > they're > > > > > > intended > > > > > > > (I realize the changes I'm suggesting are outside your intended > > use > > > > > case, > > > > > > > but if you're interested we may be able to provide a very > capable > > > > > > developer > > > > > > > to help with the work, assuming we choose kafka over the other > > > > zillion > > > > > > > streaming systems that are coming out of the woodwork). > > > > > > > > > > > > > > a. *Producer "queue.time"* - In my question 4 below, I was > > > referring > > > > to > > > > > > the > > > > > > > producer queue time. With a default value of 5 seconds, that > > > > accounts > > > > > > for > > > > > > > half your end to end latency. A system like zeromq is optimized > > to > > > > > write > > > > > > > data immediately without delay, but in such a way to minimizes > > the > > > > > number > > > > > > > of > > > > > > > system calls required during high throughput messages. Zeromq > is > > no > > > > > > > nirvana, > > > > > > > but it has a number of nice properties. > > > > > > > > > > > > > > b. *Broker "log.default.flush.interval.ms"* - The default > value > > of > > > 3 > > > > > > > seconds > > > > > > > appears to be another significant source of latency in the > > system, > > > > > > assuming > > > > > > > that clients are unable to access data until it has been > flushed. > > > > Since > > > > > > you > > > > > > > have wisely chosen to take advantage of the buffer cache as > part > > of > > > > > your > > > > > > > system design, it seems that you could remove this latency > > > completely > > > > > by > > > > > > > memory mapping the partitions and memcpying each message as it > > > > arrives. > > > > > > > With > > > > > > > the right IPC mechanism clients could have immediate access to > > new > > > > > > > messages. > > > > > > > > > > > > > > c. *Batching, sync vs async, replication, and auditing*. Its > > > > > > understandable > > > > > > > that you've chosen a a forensic approach to producer > reliability > > > > (after > > > > > > the > > > > > > > fact auditing), but when you implement replication it would be > > > really > > > > > > nice > > > > > > > to revise the producer protocol mechanisms. If you used a > > streaming > > > > > > > mechanism with producer offsets and ACKs, you could ensure > > reliable > > > > > > > delivery > > > > > > > of producer streams to multiple brokers without the need to > > choose > > > a > > > > > > "batch > > > > > > > size" or "queue.time". This could also give you active/active > > > > failover > > > > > of > > > > > > > brokers. This may also help in the WAN case (my question 3 > below) > > > > > because > > > > > > > you will be able to adaptively stuff more and more data through > > the > > > > > fiber > > > > > > > for high bandwidth*delay links without having to choose a large > > > > "batch > > > > > > > size" > > > > > > > nor have the additional latency that entails. Oh, and it will > > help > > > > you > > > > > > deal > > > > > > > with CRC errors once you start checking for them. > > > > > > > > > > > > > > c. *Performance measurements* - I'd like to make a suggestion > for > > > > your > > > > > > > performance measurements. Your benchmarks measure throughput, > but > > a > > > > > > > throughput number is meaningless without an associated "% cpu > > > time". > > > > > > > Ideally > > > > > > > all measurements achieve wire speed (100MB/sec) at 0% CPU > (since, > > > > after > > > > > > > all, > > > > > > > this is plumbing and we assume the cores in the system should > > have > > > > > > capacity > > > > > > > set aside for useful work too). Obviously nobody ever achieves > > > this, > > > > > but > > > > > > by > > > > > > > measuring it one can raise the bar in terms of optimization. > > > > > > > > > > > > > > Paul > > > > > > > > > > > > > > ps. Just for background, I am the cofounder at Quantcast where > we > > > > > process > > > > > > > 3.5PB of data per day. These questions are related to my new > > > startup > > > > > > > Quantbench which will deal with financial market data where you > > > dont > > > > > want > > > > > > > any latency at all. And WAN issues are a big deal too. > > > Incidentally, > > > > I > > > > > > was > > > > > > > also founder of Orbital Data which was a WAN optimization > company > > > so > > > > > I've > > > > > > > done a lot of work with protocols over long distances. > > > > > > > > > > > > > > On Mon, Jul 18, 2011 at 7:14 PM, Jun Rao <[email protected]> > > wrote: > > > > > > > > > > > > > > > Paul, > > > > > > > > > > > > > > > > Excellent questions. See my answers below. Thanks, > > > > > > > > > > > > > > > > On Mon, Jul 18, 2011 at 6:41 PM, Paul Sutter < > > > > [email protected] > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Kafka looks like an exciting project, thanks for opening it > > up. > > > > > > > > > > > > > > > > > > I have a few questions: > > > > > > > > > > > > > > > > > > 1. Are checksums end to end (ie, created by the producer > and > > > > > checked > > > > > > by > > > > > > > > the > > > > > > > > > consumer)? or are they only used to confirm buffercache > > > behavior > > > > on > > > > > > > disk > > > > > > > > as > > > > > > > > > mentioned in the documentation? Bit errors occur vastly > more > > > > often > > > > > > than > > > > > > > > > most > > > > > > > > > people assume, often because of device driver bugs. TCP > only > > > > > detects > > > > > > 1 > > > > > > > > > error > > > > > > > > > in 65536, so errors can flow through (if you like I can > send > > > > links > > > > > to > > > > > > > > > papers > > > > > > > > > describing the need for checksums everywhere). > > > > > > > > > > > > > > > > > > > > > > > > > Checksum is generated at the producer and propagated to the > > > broker > > > > > and > > > > > > > > eventually the consumer. Currently, we only validate the > > checksum > > > > at > > > > > > the > > > > > > > > broker. We could further validate it at the consumer in the > > > future. > > > > > > > > > > > > > > > > > > > > > > > > > > 2. The consumer has a pretty solid mechanism to ensure it > > hasnt > > > > > > missed > > > > > > > > any > > > > > > > > > messages (i like the design by the way), but how does the > > > > producer > > > > > > know > > > > > > > > > that > > > > > > > > > all of its messages have been stored? (no apparent message > id > > > on > > > > > that > > > > > > > > side > > > > > > > > > since the message id isnt known until the message is > written > > to > > > > the > > > > > > > > file). > > > > > > > > > I'm especially curious how failover/replication could be > > > > > implemented > > > > > > > and > > > > > > > > > I'm > > > > > > > > > thinking that acks on the publisher side may help) > > > > > > > > > > > > > > > > > > > > > > > > > The producer side auditing is not built-in. At LinkedIn, we > do > > > that > > > > > by > > > > > > > > generating an auditing event periodically in the eventhandler > > of > > > > the > > > > > > > async > > > > > > > > producer. The auditing event contains the number of events > > > produced > > > > > in > > > > > > a > > > > > > > > configured window (e.g., 10 minutes) and are sent to a > separate > > > > > topic. > > > > > > > The > > > > > > > > consumer can read the actual data and the auditing event and > > > > compare > > > > > > the > > > > > > > > counts. See our paper ( > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > http://research.microsoft.com/en-us/um/people/srikanth/netdb11/netdb11papers/netdb11-final12.pdf > > > > > > > > ) > > > > > > > > for some more details. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 3. Has the consumer's flow control been tested over high > > > > > > > bandwidth*delay > > > > > > > > > links? (what bandwidth can you get from a London consumer > of > > an > > > > SF > > > > > > > > > cluster?) > > > > > > > > > > > > > > > > > > Yes, we actually replicate kafka data across data centers, > > > using > > > > an > > > > > > > > embedded consumer in a broker. Again, there is a bit more > info > > on > > > > > this > > > > > > in > > > > > > > > our paper. > > > > > > > > > > > > > > > > > > > > > > > > > 4. What kind of performance do you get if you set the > > > producer's > > > > > > > message > > > > > > > > > delay to zero? (ie, is there a separate system call for > each > > > > > message? > > > > > > > or > > > > > > > > do > > > > > > > > > you manage to aggregate messages into a smaller number of > > > system > > > > > > calls > > > > > > > > even > > > > > > > > > with a delay of 0?) > > > > > > > > > > > > > > > > > > I assume that you are referring to the flush interval. One > > can > > > > > > > configure > > > > > > > > to > > > > > > > > flush every message to disk. This will slow down the > throughput > > > > > > > > significantly. > > > > > > > > > > > > > > > > > > > > > > > > > 5. Have you considered using a library like zeromq for the > > > > > messaging > > > > > > > > layer > > > > > > > > > instead of rolling your own? (zeromq will handle #4 cleanly > > at > > > > > > millions > > > > > > > > of > > > > > > > > > messages per second and has clients in 20 languages) > > > > > > > > > > > > > > > > > > No. Our proprietary format allows us to support things like > > > > > > compression > > > > > > > > in > > > > > > > > the future. However, we can definitely look into the zeromq > > > format. > > > > > Is > > > > > > > > their > > > > > > > > messaging layer easily extractable? > > > > > > > > > > > > > > > > > > > > > > > > > 6. Do you have any plans to support intermediate processing > > > > > elements > > > > > > > the > > > > > > > > > way > > > > > > > > > Flume supports? > > > > > > > > > > > > > > > > > > For now, we are just focusing on getting the raw messaging > > > layer > > > > > > solid. > > > > > > > > We > > > > > > > > have worked a bit on streaming processing and will look into > > that > > > > > again > > > > > > > in > > > > > > > > the future. > > > > > > > > > > > > > > > > > > > > > > > > > 7. The docs mention that new versions will only be released > > > after > > > > > > they > > > > > > > > are > > > > > > > > > in production at LinkedIn? Does that mean that the latest > > > version > > > > > of > > > > > > > the > > > > > > > > > source code is hidden at LinkedIn and contributors would > have > > > to > > > > > > throw > > > > > > > > > patches over the wall and wait months to get the integrated > > > > > product? > > > > > > > > > > > > > > > > > > What we ran at LinkedIn is the same version in open source > > and > > > > > there > > > > > > is > > > > > > > > no > > > > > > > > internal repository of Kafka at LinkedIn. We plan to maintain > > > that > > > > in > > > > > > the > > > > > > > > future. > > > > > > > > > > > > > > > > > > > > > > > > > Thanks! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
