If the unflushed messages havent been acked to the publisher, they havent been lost in the system.
On Wed, Jul 20, 2011 at 9:09 AM, Jun Rao <[email protected]> wrote: > Paul, > > The only concern is that if we expose unflushed messages, those messages > could disappear after a broker machine restart. > > Jun > > On Tue, Jul 19, 2011 at 2:02 PM, Paul Sutter <[email protected]> > wrote: > > > One more suggestion: > > > > Even before you have replication, it seems that you could delay producer > > side acks until after the data is recorded to disk, and still pass the > data > > forward to consumers immediately. > > > > > > > > On Jul 19, 2011, at 12:23 PM, Jay Kreps <[email protected]> wrote: > > > > > Agreed, no reason the policy to hand out messages should not be > > > configurable. We were hoping to make the whole question irrelevant with > > the > > > replication since then the producer can choose the replication level it > > > wants and fsync durability should be less of a concern. > > > > > > I agree with your comment that a good implementation of streaming with > > acks > > > being potentially superior. > > > > > > -Jay > > > > > > On Tue, Jul 19, 2011 at 11:15 AM, Paul Sutter <[email protected] > > >wrote: > > > > > >> Jay, > > >> > > >> Ah - thanks for the clarification on the delay in the broker. It would > > be > > >> nice to if that were a configuration option, so that the end user can > > >> choose > > >> only to forward messages that have been written to disk, or choose to > > have > > >> the data forwarded immediately. When you implement replication data > > hitting > > >> the disk will matter less. > > >> > > >> On the delay in the producer, I think it could best be resolved > through > > >> measurement. In your paper you compare two different approaches, and > I'm > > >> proposing a third: > > >> > > >> 1. Send and wait (single message, JMS style) > > >> 2. Batch, send, and wait (Kafka today) > > >> 3. Stream with ACKs > > >> > > >> Removing any wait for a reply should increase throughput, not decrease > > it, > > >> so you're likely trading latency against potential CPU efficiency. And > > the > > >> CPU savings is a question best resolved by measurement. > > >> > > >> I'd also encourage you to think about the WAN case. When you > > send-and-wait, > > >> you need to send a buffer that is >> the bandwidth delay product to > > >> approach > > >> full line utilization, and the line will go idle for one RTT while you > > stop > > >> to wait for a reply. The bandwidth*delay product can get large (10s of > > >> megabytes), and end users will rarely understand the need to tune the > > batch > > >> size to increase throughput. They'll just say it's slow over long > > >> distances. > > >> > > >> All that said - your use case doesn't require minimizing latency or > WAN > > >> use, > > >> so I can really understand if this isn't a priority for you. > > >> > > >> It's a well designed product that has had some real thought put into > it. > > >> It's a really promising system, thanks for taking the time to respond > to > > my > > >> comments. > > >> > > >> Paul > > >> > > >> On Tue, Jul 19, 2011 at 10:44 AM, Jay Kreps <[email protected]> > > wrote: > > >> > > >>> Ah, I think what you are describing in zeromq is essentially the > > >> equivalent > > >>> of group commit for the socket. Essentially you wait until the socket > > is > > >> no > > >>> longer writable and then begin to queue data. This is an interesting > > >> idea. > > >>> Of course it would only have a positive effect when you had already > > >>> overflowed the socket buffer and were sending a very high throughput > of > > >>> small messages. That basically is a way to degrade an overloaded > > >>> synchronous > > >>> send into a batched send. This is not really the same as what we have > > >> done, > > >>> which is to allow the ability to trade off latency for throughput in > a > > >>> configurable manner. The reason the later is important is that we do > > not > > >>> have a handful of producers sending at a rate that saturates the > > network > > >>> I/O > > >>> capacity of those servers (the case where the group commit would > help) > > >> but > > >>> rather we have thousands of producers sending at a medium low volume, > > so > > >> we > > >>> would never hit that in our use case. The advantage of batching is > > fewer > > >>> requests that hit the server, and larger packets. Where the group > > commit > > >>> would help is for the synchronous producer benchmarks, where you > could > > >>> potentially get much better throughput. This is something we should > > >>> consider > > >>> adding. > > >>> > > >>> To be clear, though, we have not added latency in our layer, just > made > > a > > >>> configurable way to trade-off latency for throughput. This is > > >> unambiguously > > >>> a good thing, I think. > > >>> > > >>> With respect to mmap, i think you are misunderstanding where the > > latency > > >>> comes from. We immediately write data to the filesystem with no delay > > >>> whatsoever. This incurs the overhead of a system call, as you point > > out, > > >>> which could be avoided by mmap, but that doesn't add much in the way > of > > >>> latency. The latency comes from the fact that we do not make the > > written > > >>> data available to consumers until we fsync the file to "ensure" the > > >>> durability of consumed messages. The frequency of the fsync is > > >>> configurable, > > >>> anything either immediate or with a time or # messages threshold. > This > > >>> again > > >>> trades latency for throughput. > > >>> > > >>> -Jay > > >>> > > >>> On Mon, Jul 18, 2011 at 10:26 PM, Paul Sutter < > [email protected] > > >>>> wrote: > > >>> > > >>>> *Producer latency* - I'm not familiar with zeromq internals but my > > >>>> understanding is that they send the first message(s) immediately and > > as > > >>> TCP > > >>>> queues up the data, it will eventually block as the send buffer > fills, > > >>> and > > >>>> during this time messages can queue up, and thte net-net is that on > > >>> average > > >>>> the number of system calls is << the number of messages. The key is > > >>> having > > >>>> a > > >>>> separate thread for network operations with very efficient thread > > >>>> coordination. Clearly Nagle is disabled (TCP_NODELAY) as Nagle is a > > >>> blight > > >>>> against humanity. > > >>>> > > >>>> Having any sort of delay adds latency. If every developer thinks its > > OK > > >>> to > > >>>> add a little latency in his layer, pretty soon you end up with 10 > > >> second > > >>>> end > > >>>> to end latency. > > >>>> > > >>>> Having an "accumulated message count" is also bad for WAN > performance. > > >> If > > >>>> your "window size" is a set of delayed messages, the only way to > deal > > >>> with > > >>>> a > > >>>> large bandwidth*delay product is to delay a lot of messages, then > send > > >>>> them. > > >>>> You can fit a lot of data into a fiber. Imagine a gigabit link with > > >> 100ms > > >>>> roundtrip time, you can store 100MB in the fiber. And you need a > > >>> multiples > > >>>> of that for buffering if you need to do a retransmit. > > >>>> > > >>>> *Broker Latency *- With mmap the memcpy() of the message should make > > >> the > > >>>> data available to a thread even in another process, the pages that > you > > >>> have > > >>>> mapped are also in the buffer cache and available to a sendfile() > > call. > > >>> or > > >>>> at least I think so. The flush to physical disk (or msync() in this > > >> case) > > >>>> would still be delayed but without impacting end to end latency. > > >>>> > > >>>> That said, in benchmarks I have done the fastest IO with the lowest > > CPU > > >>>> overhead is unbuffered (direct) IO (which is lower overhead than > using > > >>> the > > >>>> buffer cache with or without memory mapping), but then you'd have to > > >>> manage > > >>>> your own buffer pool and run your broker in a single multithreaded > > >>> process. > > >>>> But thats getting more extreme. Just getting rid of this buffer > write > > >>> delay > > >>>> by using memory mapping will remove a big chunk of latency. > > >>>> > > >>>> > > >>>> > > >>>> > > >>>> > > >>>> > > >>>> > > >>>> > > >>>> > > >>>> On Mon, Jul 18, 2011 at 9:30 PM, Jay Kreps <[email protected]> > > >> wrote: > > >>>> > > >>>>> Hi Paul, > > >>>>> > > >>>>> We are definitely interested in lowering latency--lower is always > > >>>>> better--but that was not a major concern for us so far (we were > > >>> replacing > > >>>> a > > >>>>> system with 1 hour latency), so we haven't focused on it yet. As > you > > >>>>> describe latency in our setup at linkedin comes from batching on > the > > >>>>> frontend and batching on the kafka servers do to very lenient flush > > >>>>> settings. > > >>>>> > > >>>>> I am interested in your comments on zeromq. Do they actually have a > > >>>> better > > >>>>> approach for this problem even when using TCP? If so I would be > > >>>> interested > > >>>>> to understand. The way I see things this is about trading > throughput > > >>> and > > >>>>> latency. On the producer side you have only a few options: > > >> immediately > > >>>>> write > > >>>>> the data to the socket buffer for sending or wait and see if the > > >>>>> application > > >>>>> writes more data. The OS will do this for you unless you set > > >>> TCP_NODELAY, > > >>>>> but the OS is relatively inflexible, it doesn't understand your > data > > >> so > > >>> I > > >>>>> think it just waits 200ms or until the socket buffer is full. > > >>>>> > > >>>>> The current approach in the async producer captures the same > > >> tradeoff, > > >>>> but > > >>>>> a > > >>>>> little more flexibly, it allows you to specify a max delay and max > > >>>>> accumulated message count, data is written when either of those is > > >> hit. > > >>>>> > > >>>>> Is it possible to better capture this tradeoff? Basically I am not > > >>> aware > > >>>> of > > >>>>> any other trick here if you are using TCP, so i would be interested > > >> in > > >>>> what > > >>>>> zeromq does if they are doing this better. > > >>>>> > > >>>>> We do indeed write each message set to the filesystem as it arrives > > >> but > > >>>> we > > >>>>> distribute messages to consumers only after the write has been > > >> flushed > > >>> to > > >>>>> disk, delaying (batching) that flush is the cause of the latency > but > > >>> also > > >>>>> gives better use of IOPs by generating larger writes. Mmap would > > >> remove > > >>>> the > > >>>>> system call (which would be good), but not the flush I think. As > you > > >>> say, > > >>>>> adding replication allows giving stronger guarantees without > actually > > >>>>> caring > > >>>>> about durability on a particular server which would make it > possible > > >> to > > >>>>> distribute messages to consumers after ack from some number of > other > > >>>>> servers > > >>>>> irrespective of flushing to disk. > > >>>>> > > >>>>> -Jay > > >>>>> > > >>>>> On Mon, Jul 18, 2011 at 8:27 PM, Paul Sutter < > [email protected] > > >>> > > >>>>> wrote: > > >>>>> > > >>>>>> Jun > > >>>>>> > > >>>>>> Thanks for your answers and the link to the paper - that helps a > > >> lot, > > >>>>>> especially the comment in the paper that 10 second end to end > > >> latency > > >>>> is > > >>>>>> good enough for your intended use case. > > >>>>>> > > >>>>>> We're looking for much lower latencies, and the basic design of > > >> Kafka > > >>>>> feels > > >>>>>> like it should support latencies in milliseconds with a few > > >> changes. > > >>>>> We're > > >>>>>> either going to build our own system, or help develop something > > >> that > > >>>>>> already > > >>>>>> exists, so please take my comments in the constructive way they're > > >>>>> intended > > >>>>>> (I realize the changes I'm suggesting are outside your intended > use > > >>>> case, > > >>>>>> but if you're interested we may be able to provide a very capable > > >>>>> developer > > >>>>>> to help with the work, assuming we choose kafka over the other > > >>> zillion > > >>>>>> streaming systems that are coming out of the woodwork). > > >>>>>> > > >>>>>> a. *Producer "queue.time"* - In my question 4 below, I was > > >> referring > > >>> to > > >>>>> the > > >>>>>> producer queue time. With a default value of 5 seconds, that > > >>> accounts > > >>>>> for > > >>>>>> half your end to end latency. A system like zeromq is optimized to > > >>>> write > > >>>>>> data immediately without delay, but in such a way to minimizes the > > >>>> number > > >>>>>> of > > >>>>>> system calls required during high throughput messages. Zeromq is > no > > >>>>>> nirvana, > > >>>>>> but it has a number of nice properties. > > >>>>>> > > >>>>>> b. *Broker "log.default.flush.interval.ms"* - The default value > of > > >> 3 > > >>>>>> seconds > > >>>>>> appears to be another significant source of latency in the system, > > >>>>> assuming > > >>>>>> that clients are unable to access data until it has been flushed. > > >>> Since > > >>>>> you > > >>>>>> have wisely chosen to take advantage of the buffer cache as part > of > > >>>> your > > >>>>>> system design, it seems that you could remove this latency > > >> completely > > >>>> by > > >>>>>> memory mapping the partitions and memcpying each message as it > > >>> arrives. > > >>>>>> With > > >>>>>> the right IPC mechanism clients could have immediate access to new > > >>>>>> messages. > > >>>>>> > > >>>>>> c. *Batching, sync vs async, replication, and auditing*. Its > > >>>>> understandable > > >>>>>> that you've chosen a a forensic approach to producer reliability > > >>> (after > > >>>>> the > > >>>>>> fact auditing), but when you implement replication it would be > > >> really > > >>>>> nice > > >>>>>> to revise the producer protocol mechanisms. If you used a > streaming > > >>>>>> mechanism with producer offsets and ACKs, you could ensure > reliable > > >>>>>> delivery > > >>>>>> of producer streams to multiple brokers without the need to choose > > >> a > > >>>>> "batch > > >>>>>> size" or "queue.time". This could also give you active/active > > >>> failover > > >>>> of > > >>>>>> brokers. This may also help in the WAN case (my question 3 below) > > >>>> because > > >>>>>> you will be able to adaptively stuff more and more data through > the > > >>>> fiber > > >>>>>> for high bandwidth*delay links without having to choose a large > > >>> "batch > > >>>>>> size" > > >>>>>> nor have the additional latency that entails. Oh, and it will help > > >>> you > > >>>>> deal > > >>>>>> with CRC errors once you start checking for them. > > >>>>>> > > >>>>>> c. *Performance measurements* - I'd like to make a suggestion for > > >>> your > > >>>>>> performance measurements. Your benchmarks measure throughput, but > a > > >>>>>> throughput number is meaningless without an associated "% cpu > > >> time". > > >>>>>> Ideally > > >>>>>> all measurements achieve wire speed (100MB/sec) at 0% CPU (since, > > >>> after > > >>>>>> all, > > >>>>>> this is plumbing and we assume the cores in the system should have > > >>>>> capacity > > >>>>>> set aside for useful work too). Obviously nobody ever achieves > > >> this, > > >>>> but > > >>>>> by > > >>>>>> measuring it one can raise the bar in terms of optimization. > > >>>>>> > > >>>>>> Paul > > >>>>>> > > >>>>>> ps. Just for background, I am the cofounder at Quantcast where we > > >>>> process > > >>>>>> 3.5PB of data per day. These questions are related to my new > > >> startup > > >>>>>> Quantbench which will deal with financial market data where you > > >> dont > > >>>> want > > >>>>>> any latency at all. And WAN issues are a big deal too. > > >> Incidentally, > > >>> I > > >>>>> was > > >>>>>> also founder of Orbital Data which was a WAN optimization company > > >> so > > >>>> I've > > >>>>>> done a lot of work with protocols over long distances. > > >>>>>> > > >>>>>> On Mon, Jul 18, 2011 at 7:14 PM, Jun Rao <[email protected]> > wrote: > > >>>>>> > > >>>>>>> Paul, > > >>>>>>> > > >>>>>>> Excellent questions. See my answers below. Thanks, > > >>>>>>> > > >>>>>>> On Mon, Jul 18, 2011 at 6:41 PM, Paul Sutter < > > >>> [email protected] > > >>>>> > > >>>>>>> wrote: > > >>>>>>> > > >>>>>>>> Kafka looks like an exciting project, thanks for opening it up. > > >>>>>>>> > > >>>>>>>> I have a few questions: > > >>>>>>>> > > >>>>>>>> 1. Are checksums end to end (ie, created by the producer and > > >>>> checked > > >>>>> by > > >>>>>>> the > > >>>>>>>> consumer)? or are they only used to confirm buffercache > > >> behavior > > >>> on > > >>>>>> disk > > >>>>>>> as > > >>>>>>>> mentioned in the documentation? Bit errors occur vastly more > > >>> often > > >>>>> than > > >>>>>>>> most > > >>>>>>>> people assume, often because of device driver bugs. TCP only > > >>>> detects > > >>>>> 1 > > >>>>>>>> error > > >>>>>>>> in 65536, so errors can flow through (if you like I can send > > >>> links > > >>>> to > > >>>>>>>> papers > > >>>>>>>> describing the need for checksums everywhere). > > >>>>>>>> > > >>>>>>> > > >>>>>>> Checksum is generated at the producer and propagated to the > > >> broker > > >>>> and > > >>>>>>> eventually the consumer. Currently, we only validate the checksum > > >>> at > > >>>>> the > > >>>>>>> broker. We could further validate it at the consumer in the > > >> future. > > >>>>>>> > > >>>>>>>> > > >>>>>>>> 2. The consumer has a pretty solid mechanism to ensure it hasnt > > >>>>> missed > > >>>>>>> any > > >>>>>>>> messages (i like the design by the way), but how does the > > >>> producer > > >>>>> know > > >>>>>>>> that > > >>>>>>>> all of its messages have been stored? (no apparent message id > > >> on > > >>>> that > > >>>>>>> side > > >>>>>>>> since the message id isnt known until the message is written to > > >>> the > > >>>>>>> file). > > >>>>>>>> I'm especially curious how failover/replication could be > > >>>> implemented > > >>>>>> and > > >>>>>>>> I'm > > >>>>>>>> thinking that acks on the publisher side may help) > > >>>>>>>> > > >>>>>>> > > >>>>>>> The producer side auditing is not built-in. At LinkedIn, we do > > >> that > > >>>> by > > >>>>>>> generating an auditing event periodically in the eventhandler of > > >>> the > > >>>>>> async > > >>>>>>> producer. The auditing event contains the number of events > > >> produced > > >>>> in > > >>>>> a > > >>>>>>> configured window (e.g., 10 minutes) and are sent to a separate > > >>>> topic. > > >>>>>> The > > >>>>>>> consumer can read the actual data and the auditing event and > > >>> compare > > >>>>> the > > >>>>>>> counts. See our paper ( > > >>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >>> > > >> > > > http://research.microsoft.com/en-us/um/people/srikanth/netdb11/netdb11papers/netdb11-final12.pdf > > >>>>>>> ) > > >>>>>>> for some more details. > > >>>>>>> > > >>>>>>> > > >>>>>>>> > > >>>>>>>> 3. Has the consumer's flow control been tested over high > > >>>>>> bandwidth*delay > > >>>>>>>> links? (what bandwidth can you get from a London consumer of an > > >>> SF > > >>>>>>>> cluster?) > > >>>>>>>> > > >>>>>>>> Yes, we actually replicate kafka data across data centers, > > >> using > > >>> an > > >>>>>>> embedded consumer in a broker. Again, there is a bit more info on > > >>>> this > > >>>>> in > > >>>>>>> our paper. > > >>>>>>> > > >>>>>>> > > >>>>>>>> 4. What kind of performance do you get if you set the > > >> producer's > > >>>>>> message > > >>>>>>>> delay to zero? (ie, is there a separate system call for each > > >>>> message? > > >>>>>> or > > >>>>>>> do > > >>>>>>>> you manage to aggregate messages into a smaller number of > > >> system > > >>>>> calls > > >>>>>>> even > > >>>>>>>> with a delay of 0?) > > >>>>>>>> > > >>>>>>>> I assume that you are referring to the flush interval. One can > > >>>>>> configure > > >>>>>>> to > > >>>>>>> flush every message to disk. This will slow down the throughput > > >>>>>>> significantly. > > >>>>>>> > > >>>>>>> > > >>>>>>>> 5. Have you considered using a library like zeromq for the > > >>>> messaging > > >>>>>>> layer > > >>>>>>>> instead of rolling your own? (zeromq will handle #4 cleanly at > > >>>>> millions > > >>>>>>> of > > >>>>>>>> messages per second and has clients in 20 languages) > > >>>>>>>> > > >>>>>>>> No. Our proprietary format allows us to support things like > > >>>>> compression > > >>>>>>> in > > >>>>>>> the future. However, we can definitely look into the zeromq > > >> format. > > >>>> Is > > >>>>>>> their > > >>>>>>> messaging layer easily extractable? > > >>>>>>> > > >>>>>>> > > >>>>>>>> 6. Do you have any plans to support intermediate processing > > >>>> elements > > >>>>>> the > > >>>>>>>> way > > >>>>>>>> Flume supports? > > >>>>>>>> > > >>>>>>>> For now, we are just focusing on getting the raw messaging > > >> layer > > >>>>> solid. > > >>>>>>> We > > >>>>>>> have worked a bit on streaming processing and will look into that > > >>>> again > > >>>>>> in > > >>>>>>> the future. > > >>>>>>> > > >>>>>>> > > >>>>>>>> 7. The docs mention that new versions will only be released > > >> after > > >>>>> they > > >>>>>>> are > > >>>>>>>> in production at LinkedIn? Does that mean that the latest > > >> version > > >>>> of > > >>>>>> the > > >>>>>>>> source code is hidden at LinkedIn and contributors would have > > >> to > > >>>>> throw > > >>>>>>>> patches over the wall and wait months to get the integrated > > >>>> product? > > >>>>>>>> > > >>>>>>>> What we ran at LinkedIn is the same version in open source and > > >>>> there > > >>>>> is > > >>>>>>> no > > >>>>>>> internal repository of Kafka at LinkedIn. We plan to maintain > > >> that > > >>> in > > >>>>> the > > >>>>>>> future. > > >>>>>>> > > >>>>>>> > > >>>>>>>> Thanks! > > >>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >>> > > >> > > >
