Paul, The only concern is that if we expose unflushed messages, those messages could disappear after a broker machine restart.
Jun On Tue, Jul 19, 2011 at 2:02 PM, Paul Sutter <[email protected]> wrote: > One more suggestion: > > Even before you have replication, it seems that you could delay producer > side acks until after the data is recorded to disk, and still pass the data > forward to consumers immediately. > > > > On Jul 19, 2011, at 12:23 PM, Jay Kreps <[email protected]> wrote: > > > Agreed, no reason the policy to hand out messages should not be > > configurable. We were hoping to make the whole question irrelevant with > the > > replication since then the producer can choose the replication level it > > wants and fsync durability should be less of a concern. > > > > I agree with your comment that a good implementation of streaming with > acks > > being potentially superior. > > > > -Jay > > > > On Tue, Jul 19, 2011 at 11:15 AM, Paul Sutter <[email protected] > >wrote: > > > >> Jay, > >> > >> Ah - thanks for the clarification on the delay in the broker. It would > be > >> nice to if that were a configuration option, so that the end user can > >> choose > >> only to forward messages that have been written to disk, or choose to > have > >> the data forwarded immediately. When you implement replication data > hitting > >> the disk will matter less. > >> > >> On the delay in the producer, I think it could best be resolved through > >> measurement. In your paper you compare two different approaches, and I'm > >> proposing a third: > >> > >> 1. Send and wait (single message, JMS style) > >> 2. Batch, send, and wait (Kafka today) > >> 3. Stream with ACKs > >> > >> Removing any wait for a reply should increase throughput, not decrease > it, > >> so you're likely trading latency against potential CPU efficiency. And > the > >> CPU savings is a question best resolved by measurement. > >> > >> I'd also encourage you to think about the WAN case. When you > send-and-wait, > >> you need to send a buffer that is >> the bandwidth delay product to > >> approach > >> full line utilization, and the line will go idle for one RTT while you > stop > >> to wait for a reply. The bandwidth*delay product can get large (10s of > >> megabytes), and end users will rarely understand the need to tune the > batch > >> size to increase throughput. They'll just say it's slow over long > >> distances. > >> > >> All that said - your use case doesn't require minimizing latency or WAN > >> use, > >> so I can really understand if this isn't a priority for you. > >> > >> It's a well designed product that has had some real thought put into it. > >> It's a really promising system, thanks for taking the time to respond to > my > >> comments. > >> > >> Paul > >> > >> On Tue, Jul 19, 2011 at 10:44 AM, Jay Kreps <[email protected]> > wrote: > >> > >>> Ah, I think what you are describing in zeromq is essentially the > >> equivalent > >>> of group commit for the socket. Essentially you wait until the socket > is > >> no > >>> longer writable and then begin to queue data. This is an interesting > >> idea. > >>> Of course it would only have a positive effect when you had already > >>> overflowed the socket buffer and were sending a very high throughput of > >>> small messages. That basically is a way to degrade an overloaded > >>> synchronous > >>> send into a batched send. This is not really the same as what we have > >> done, > >>> which is to allow the ability to trade off latency for throughput in a > >>> configurable manner. The reason the later is important is that we do > not > >>> have a handful of producers sending at a rate that saturates the > network > >>> I/O > >>> capacity of those servers (the case where the group commit would help) > >> but > >>> rather we have thousands of producers sending at a medium low volume, > so > >> we > >>> would never hit that in our use case. The advantage of batching is > fewer > >>> requests that hit the server, and larger packets. Where the group > commit > >>> would help is for the synchronous producer benchmarks, where you could > >>> potentially get much better throughput. This is something we should > >>> consider > >>> adding. > >>> > >>> To be clear, though, we have not added latency in our layer, just made > a > >>> configurable way to trade-off latency for throughput. This is > >> unambiguously > >>> a good thing, I think. > >>> > >>> With respect to mmap, i think you are misunderstanding where the > latency > >>> comes from. We immediately write data to the filesystem with no delay > >>> whatsoever. This incurs the overhead of a system call, as you point > out, > >>> which could be avoided by mmap, but that doesn't add much in the way of > >>> latency. The latency comes from the fact that we do not make the > written > >>> data available to consumers until we fsync the file to "ensure" the > >>> durability of consumed messages. The frequency of the fsync is > >>> configurable, > >>> anything either immediate or with a time or # messages threshold. This > >>> again > >>> trades latency for throughput. > >>> > >>> -Jay > >>> > >>> On Mon, Jul 18, 2011 at 10:26 PM, Paul Sutter <[email protected] > >>>> wrote: > >>> > >>>> *Producer latency* - I'm not familiar with zeromq internals but my > >>>> understanding is that they send the first message(s) immediately and > as > >>> TCP > >>>> queues up the data, it will eventually block as the send buffer fills, > >>> and > >>>> during this time messages can queue up, and thte net-net is that on > >>> average > >>>> the number of system calls is << the number of messages. The key is > >>> having > >>>> a > >>>> separate thread for network operations with very efficient thread > >>>> coordination. Clearly Nagle is disabled (TCP_NODELAY) as Nagle is a > >>> blight > >>>> against humanity. > >>>> > >>>> Having any sort of delay adds latency. If every developer thinks its > OK > >>> to > >>>> add a little latency in his layer, pretty soon you end up with 10 > >> second > >>>> end > >>>> to end latency. > >>>> > >>>> Having an "accumulated message count" is also bad for WAN performance. > >> If > >>>> your "window size" is a set of delayed messages, the only way to deal > >>> with > >>>> a > >>>> large bandwidth*delay product is to delay a lot of messages, then send > >>>> them. > >>>> You can fit a lot of data into a fiber. Imagine a gigabit link with > >> 100ms > >>>> roundtrip time, you can store 100MB in the fiber. And you need a > >>> multiples > >>>> of that for buffering if you need to do a retransmit. > >>>> > >>>> *Broker Latency *- With mmap the memcpy() of the message should make > >> the > >>>> data available to a thread even in another process, the pages that you > >>> have > >>>> mapped are also in the buffer cache and available to a sendfile() > call. > >>> or > >>>> at least I think so. The flush to physical disk (or msync() in this > >> case) > >>>> would still be delayed but without impacting end to end latency. > >>>> > >>>> That said, in benchmarks I have done the fastest IO with the lowest > CPU > >>>> overhead is unbuffered (direct) IO (which is lower overhead than using > >>> the > >>>> buffer cache with or without memory mapping), but then you'd have to > >>> manage > >>>> your own buffer pool and run your broker in a single multithreaded > >>> process. > >>>> But thats getting more extreme. Just getting rid of this buffer write > >>> delay > >>>> by using memory mapping will remove a big chunk of latency. > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> On Mon, Jul 18, 2011 at 9:30 PM, Jay Kreps <[email protected]> > >> wrote: > >>>> > >>>>> Hi Paul, > >>>>> > >>>>> We are definitely interested in lowering latency--lower is always > >>>>> better--but that was not a major concern for us so far (we were > >>> replacing > >>>> a > >>>>> system with 1 hour latency), so we haven't focused on it yet. As you > >>>>> describe latency in our setup at linkedin comes from batching on the > >>>>> frontend and batching on the kafka servers do to very lenient flush > >>>>> settings. > >>>>> > >>>>> I am interested in your comments on zeromq. Do they actually have a > >>>> better > >>>>> approach for this problem even when using TCP? If so I would be > >>>> interested > >>>>> to understand. The way I see things this is about trading throughput > >>> and > >>>>> latency. On the producer side you have only a few options: > >> immediately > >>>>> write > >>>>> the data to the socket buffer for sending or wait and see if the > >>>>> application > >>>>> writes more data. The OS will do this for you unless you set > >>> TCP_NODELAY, > >>>>> but the OS is relatively inflexible, it doesn't understand your data > >> so > >>> I > >>>>> think it just waits 200ms or until the socket buffer is full. > >>>>> > >>>>> The current approach in the async producer captures the same > >> tradeoff, > >>>> but > >>>>> a > >>>>> little more flexibly, it allows you to specify a max delay and max > >>>>> accumulated message count, data is written when either of those is > >> hit. > >>>>> > >>>>> Is it possible to better capture this tradeoff? Basically I am not > >>> aware > >>>> of > >>>>> any other trick here if you are using TCP, so i would be interested > >> in > >>>> what > >>>>> zeromq does if they are doing this better. > >>>>> > >>>>> We do indeed write each message set to the filesystem as it arrives > >> but > >>>> we > >>>>> distribute messages to consumers only after the write has been > >> flushed > >>> to > >>>>> disk, delaying (batching) that flush is the cause of the latency but > >>> also > >>>>> gives better use of IOPs by generating larger writes. Mmap would > >> remove > >>>> the > >>>>> system call (which would be good), but not the flush I think. As you > >>> say, > >>>>> adding replication allows giving stronger guarantees without actually > >>>>> caring > >>>>> about durability on a particular server which would make it possible > >> to > >>>>> distribute messages to consumers after ack from some number of other > >>>>> servers > >>>>> irrespective of flushing to disk. > >>>>> > >>>>> -Jay > >>>>> > >>>>> On Mon, Jul 18, 2011 at 8:27 PM, Paul Sutter <[email protected] > >>> > >>>>> wrote: > >>>>> > >>>>>> Jun > >>>>>> > >>>>>> Thanks for your answers and the link to the paper - that helps a > >> lot, > >>>>>> especially the comment in the paper that 10 second end to end > >> latency > >>>> is > >>>>>> good enough for your intended use case. > >>>>>> > >>>>>> We're looking for much lower latencies, and the basic design of > >> Kafka > >>>>> feels > >>>>>> like it should support latencies in milliseconds with a few > >> changes. > >>>>> We're > >>>>>> either going to build our own system, or help develop something > >> that > >>>>>> already > >>>>>> exists, so please take my comments in the constructive way they're > >>>>> intended > >>>>>> (I realize the changes I'm suggesting are outside your intended use > >>>> case, > >>>>>> but if you're interested we may be able to provide a very capable > >>>>> developer > >>>>>> to help with the work, assuming we choose kafka over the other > >>> zillion > >>>>>> streaming systems that are coming out of the woodwork). > >>>>>> > >>>>>> a. *Producer "queue.time"* - In my question 4 below, I was > >> referring > >>> to > >>>>> the > >>>>>> producer queue time. With a default value of 5 seconds, that > >>> accounts > >>>>> for > >>>>>> half your end to end latency. A system like zeromq is optimized to > >>>> write > >>>>>> data immediately without delay, but in such a way to minimizes the > >>>> number > >>>>>> of > >>>>>> system calls required during high throughput messages. Zeromq is no > >>>>>> nirvana, > >>>>>> but it has a number of nice properties. > >>>>>> > >>>>>> b. *Broker "log.default.flush.interval.ms"* - The default value of > >> 3 > >>>>>> seconds > >>>>>> appears to be another significant source of latency in the system, > >>>>> assuming > >>>>>> that clients are unable to access data until it has been flushed. > >>> Since > >>>>> you > >>>>>> have wisely chosen to take advantage of the buffer cache as part of > >>>> your > >>>>>> system design, it seems that you could remove this latency > >> completely > >>>> by > >>>>>> memory mapping the partitions and memcpying each message as it > >>> arrives. > >>>>>> With > >>>>>> the right IPC mechanism clients could have immediate access to new > >>>>>> messages. > >>>>>> > >>>>>> c. *Batching, sync vs async, replication, and auditing*. Its > >>>>> understandable > >>>>>> that you've chosen a a forensic approach to producer reliability > >>> (after > >>>>> the > >>>>>> fact auditing), but when you implement replication it would be > >> really > >>>>> nice > >>>>>> to revise the producer protocol mechanisms. If you used a streaming > >>>>>> mechanism with producer offsets and ACKs, you could ensure reliable > >>>>>> delivery > >>>>>> of producer streams to multiple brokers without the need to choose > >> a > >>>>> "batch > >>>>>> size" or "queue.time". This could also give you active/active > >>> failover > >>>> of > >>>>>> brokers. This may also help in the WAN case (my question 3 below) > >>>> because > >>>>>> you will be able to adaptively stuff more and more data through the > >>>> fiber > >>>>>> for high bandwidth*delay links without having to choose a large > >>> "batch > >>>>>> size" > >>>>>> nor have the additional latency that entails. Oh, and it will help > >>> you > >>>>> deal > >>>>>> with CRC errors once you start checking for them. > >>>>>> > >>>>>> c. *Performance measurements* - I'd like to make a suggestion for > >>> your > >>>>>> performance measurements. Your benchmarks measure throughput, but a > >>>>>> throughput number is meaningless without an associated "% cpu > >> time". > >>>>>> Ideally > >>>>>> all measurements achieve wire speed (100MB/sec) at 0% CPU (since, > >>> after > >>>>>> all, > >>>>>> this is plumbing and we assume the cores in the system should have > >>>>> capacity > >>>>>> set aside for useful work too). Obviously nobody ever achieves > >> this, > >>>> but > >>>>> by > >>>>>> measuring it one can raise the bar in terms of optimization. > >>>>>> > >>>>>> Paul > >>>>>> > >>>>>> ps. Just for background, I am the cofounder at Quantcast where we > >>>> process > >>>>>> 3.5PB of data per day. These questions are related to my new > >> startup > >>>>>> Quantbench which will deal with financial market data where you > >> dont > >>>> want > >>>>>> any latency at all. And WAN issues are a big deal too. > >> Incidentally, > >>> I > >>>>> was > >>>>>> also founder of Orbital Data which was a WAN optimization company > >> so > >>>> I've > >>>>>> done a lot of work with protocols over long distances. > >>>>>> > >>>>>> On Mon, Jul 18, 2011 at 7:14 PM, Jun Rao <[email protected]> wrote: > >>>>>> > >>>>>>> Paul, > >>>>>>> > >>>>>>> Excellent questions. See my answers below. Thanks, > >>>>>>> > >>>>>>> On Mon, Jul 18, 2011 at 6:41 PM, Paul Sutter < > >>> [email protected] > >>>>> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Kafka looks like an exciting project, thanks for opening it up. > >>>>>>>> > >>>>>>>> I have a few questions: > >>>>>>>> > >>>>>>>> 1. Are checksums end to end (ie, created by the producer and > >>>> checked > >>>>> by > >>>>>>> the > >>>>>>>> consumer)? or are they only used to confirm buffercache > >> behavior > >>> on > >>>>>> disk > >>>>>>> as > >>>>>>>> mentioned in the documentation? Bit errors occur vastly more > >>> often > >>>>> than > >>>>>>>> most > >>>>>>>> people assume, often because of device driver bugs. TCP only > >>>> detects > >>>>> 1 > >>>>>>>> error > >>>>>>>> in 65536, so errors can flow through (if you like I can send > >>> links > >>>> to > >>>>>>>> papers > >>>>>>>> describing the need for checksums everywhere). > >>>>>>>> > >>>>>>> > >>>>>>> Checksum is generated at the producer and propagated to the > >> broker > >>>> and > >>>>>>> eventually the consumer. Currently, we only validate the checksum > >>> at > >>>>> the > >>>>>>> broker. We could further validate it at the consumer in the > >> future. > >>>>>>> > >>>>>>>> > >>>>>>>> 2. The consumer has a pretty solid mechanism to ensure it hasnt > >>>>> missed > >>>>>>> any > >>>>>>>> messages (i like the design by the way), but how does the > >>> producer > >>>>> know > >>>>>>>> that > >>>>>>>> all of its messages have been stored? (no apparent message id > >> on > >>>> that > >>>>>>> side > >>>>>>>> since the message id isnt known until the message is written to > >>> the > >>>>>>> file). > >>>>>>>> I'm especially curious how failover/replication could be > >>>> implemented > >>>>>> and > >>>>>>>> I'm > >>>>>>>> thinking that acks on the publisher side may help) > >>>>>>>> > >>>>>>> > >>>>>>> The producer side auditing is not built-in. At LinkedIn, we do > >> that > >>>> by > >>>>>>> generating an auditing event periodically in the eventhandler of > >>> the > >>>>>> async > >>>>>>> producer. The auditing event contains the number of events > >> produced > >>>> in > >>>>> a > >>>>>>> configured window (e.g., 10 minutes) and are sent to a separate > >>>> topic. > >>>>>> The > >>>>>>> consumer can read the actual data and the auditing event and > >>> compare > >>>>> the > >>>>>>> counts. See our paper ( > >>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > http://research.microsoft.com/en-us/um/people/srikanth/netdb11/netdb11papers/netdb11-final12.pdf > >>>>>>> ) > >>>>>>> for some more details. > >>>>>>> > >>>>>>> > >>>>>>>> > >>>>>>>> 3. Has the consumer's flow control been tested over high > >>>>>> bandwidth*delay > >>>>>>>> links? (what bandwidth can you get from a London consumer of an > >>> SF > >>>>>>>> cluster?) > >>>>>>>> > >>>>>>>> Yes, we actually replicate kafka data across data centers, > >> using > >>> an > >>>>>>> embedded consumer in a broker. Again, there is a bit more info on > >>>> this > >>>>> in > >>>>>>> our paper. > >>>>>>> > >>>>>>> > >>>>>>>> 4. What kind of performance do you get if you set the > >> producer's > >>>>>> message > >>>>>>>> delay to zero? (ie, is there a separate system call for each > >>>> message? > >>>>>> or > >>>>>>> do > >>>>>>>> you manage to aggregate messages into a smaller number of > >> system > >>>>> calls > >>>>>>> even > >>>>>>>> with a delay of 0?) > >>>>>>>> > >>>>>>>> I assume that you are referring to the flush interval. One can > >>>>>> configure > >>>>>>> to > >>>>>>> flush every message to disk. This will slow down the throughput > >>>>>>> significantly. > >>>>>>> > >>>>>>> > >>>>>>>> 5. Have you considered using a library like zeromq for the > >>>> messaging > >>>>>>> layer > >>>>>>>> instead of rolling your own? (zeromq will handle #4 cleanly at > >>>>> millions > >>>>>>> of > >>>>>>>> messages per second and has clients in 20 languages) > >>>>>>>> > >>>>>>>> No. Our proprietary format allows us to support things like > >>>>> compression > >>>>>>> in > >>>>>>> the future. However, we can definitely look into the zeromq > >> format. > >>>> Is > >>>>>>> their > >>>>>>> messaging layer easily extractable? > >>>>>>> > >>>>>>> > >>>>>>>> 6. Do you have any plans to support intermediate processing > >>>> elements > >>>>>> the > >>>>>>>> way > >>>>>>>> Flume supports? > >>>>>>>> > >>>>>>>> For now, we are just focusing on getting the raw messaging > >> layer > >>>>> solid. > >>>>>>> We > >>>>>>> have worked a bit on streaming processing and will look into that > >>>> again > >>>>>> in > >>>>>>> the future. > >>>>>>> > >>>>>>> > >>>>>>>> 7. The docs mention that new versions will only be released > >> after > >>>>> they > >>>>>>> are > >>>>>>>> in production at LinkedIn? Does that mean that the latest > >> version > >>>> of > >>>>>> the > >>>>>>>> source code is hidden at LinkedIn and contributors would have > >> to > >>>>> throw > >>>>>>>> patches over the wall and wait months to get the integrated > >>>> product? > >>>>>>>> > >>>>>>>> What we ran at LinkedIn is the same version in open source and > >>>> there > >>>>> is > >>>>>>> no > >>>>>>> internal repository of Kafka at LinkedIn. We plan to maintain > >> that > >>> in > >>>>> the > >>>>>>> future. > >>>>>>> > >>>>>>> > >>>>>>>> Thanks! > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> >
