Awesome! Thanks for confirmation and continued great work on Kafka!
On Thu, Mar 28, 2013 at 9:22 AM, Jun Rao <[email protected]> wrote: > Jonathan, > > With a single writer, the producer can achieve exact once write. If a send > request fails, the producer first checks the end of the log to see if the > previous write succeeded or not. The producer will only resend if the > previous write fails. > > To do this, the producer needs the offset of appended messages. In 0.8, > such offsets are not returned in our high level producer API yet. We plan > to extend our producer API post 0.8 to expose this information. > > Thanks, > > Jun > > On Wed, Mar 27, 2013 at 2:41 PM, Jonathan Hodges <[email protected]> > wrote: > > > I know this is a really old thread, but it looked like the only pertinent > > one that came up when searching for ‘exactly once’ in the archives. I > just > > want to confirm my understanding of the 0.8 version in that it still > > doesn’t completely support exactly once semantics. With the producer > > configured in sync mode and quorum commits there are still some edge case > > failure modes where the producer won’t receive the ack and resends the > > message(s). I think I read that the consumers don’t see uncommitted > > messages in the log, but I don’t think that addresses this producer case. > > Please correct me if I am missing something here. > > > > > > Don’t get me wrong we are very thankful for the 0.8 features. It offers > by > > far the best message delivery guarantees out of the products we evaluated > > like Rabbit and ActiveMQ. We attempt to make are downstream consumer > > processes idempotent to mitigate this edge case, but it isn’t always > > feasible. Also the suggestion by Milind in this thread of using Storm > for > > exactly once guarantees makes a lot of sense. Trident State seems to > > address this very issue ( > > https://github.com/nathanmarz/storm/wiki/Trident-state) so we could just > > have it mediate our topics that required exactly once. > > > > > > -Jonathan > > > > > > > > On Sat, Nov 3, 2012 at 1:53 PM, Milind Parikh <[email protected] > > >wrote: > > > > > Why wouldn't the storm approach provide semantics of exactly once > > > delivery? https://github.com/nathanmarz/storm > > > > > > Nathan actually credits the Kafka_devs for the basic idea of > transaction > > > persisting in one of his talks. > > > > > > Regards > > > Milind > > > > > > On Nov 3, 2012 11:51 AM, "Rohit Prasad" <[email protected]> > > wrote: > > > > > > > I agree that this approach only prevents duplicate messages to > > partition > > > > from the Producer side. There needs to be a similar approach on the > > > > consumer side too. Using Zk can be one solution, or other non-ZK > > > > approaches. > > > > > > > > Even if Consumer reads none or all messages of a transaction. But > that > > > does > > > > not solve the transaction problem yet. Because the > business/application > > > > logic inside the Consumer thread may execute partially and fail. So > it > > > > becomes tricky to decide the point when you want to say that you have > > > > "consumed" the message and increase consumption offset. If your > > consumer > > > > thread is saving some value into DB/HDFS/etc, ideally you want this > > save > > > > operation and consumption offset to be incremented atomically. Thats > > why > > > it > > > > boils down to Application logic implementing transactions and dealing > > > with > > > > duplicates. > > > > Maybe a journalling or redo log approach on Consumer side can help > > build > > > > such a system. > > > > > > > > It will be nice if eventually kafka can be a transport which provides > > > > "exactly once" semantics for message delivery. Then consumer threads > > can > > > be > > > > sure that they receive messages once, and can build appln logic on > top > > of > > > > that. > > > > > > > > I have a use case similar to what Jay mentioned in a previous mail. I > > > want > > > > to do aggregation but want the aggregated data to be correct, > possible > > > > avoiding duplicates incase of failures/crashes. > > > > > > > > > > > > > > > > On Fri, Nov 2, 2012 at 4:12 PM, Tom Brown <[email protected]> > > wrote: > > > > > > > > > That approach allows a producer to prevent duplicate messages to > the > > > > > partition, but what about the consumer? In my case, I don't want > the > > > > > consumer to be able to read any of the messages unless it can read > > all > > > > > of the messages from a transaction. > > > > > > > > > > I also like the idea of there being multiple types of Kafka > > > > > transaction, though, just to accommodate different performance, > > > > > reliability, and consumption patterns. Of course, the added > > complexity > > > > > of that might just sink the whole thing. > > > > > > > > > > --Tom > > > > > > > > > > On Fri, Nov 2, 2012 at 4:11 PM, Rohit Prasad < > > [email protected] > > > > > > > > > wrote: > > > > > > Getting transactional support is quite hard problem. There will > > > always > > > > be > > > > > > corner cases where the solution will not work, unless you want to > > go > > > > down > > > > > > the path of 2PC, paxos, etc which ofcourse will degrade kafka's > > > > > > performance. It is best to reconcile data and deal with duplicate > > > > > messages > > > > > > in Application layer. Having said that it would be amazing if we > > can > > > > > build > > > > > > "at most once" semantics in Kafka!! > > > > > > > > > > > > Regarding above approaches, > > > > > > The producer will always have a doubt if its commit went through. > > > i.e. > > > > if > > > > > > the ack for "commit" is not received by the producer. Or If > > producer > > > > dies > > > > > > immediately after calling the commit. When it is restarted how > does > > > it > > > > > know > > > > > > if last operation went through? > > > > > > > > > > > > I suggest the following - > > > > > > 1. Producer should attach a timestamp at the beginning of each > > > message > > > > > and > > > > > > send it to Server. > > > > > > 2. On restarts/timeouts/re-connections, the producer should first > > > read > > > > > the > > > > > > last committed message from the leader of the partition. > > > > > > 3. From timestamp, it can know how many messages went through > > before > > > it > > > > > > died (or connection was broken). And it can infer how many > messages > > > to > > > > > > replay. > > > > > > > > > > > > The above approach can be used with existing Kafka libraries > since > > > you > > > > > can > > > > > > have a producer and consumer thread together in an application to > > > > > implement > > > > > > this logic. Or someone can take the initiative to write a > > > Transactional > > > > > > producer (which internally has both producer and a consumer to > read > > > > last > > > > > > committed message.) I will be developing one for kafka 0.8 in > c++. > > > > > > > > > > > > The above approach will work even if you batch messages for a > > single > > > > > > partition. > > > > > > The above approach will work only if a single producer is writing > > to > > > a > > > > > > partition. I want hear opinions about the above approach. I sure > > > there > > > > > can > > > > > > be corner-cases where it may break. > > > > > > > > > > > > If there are multiple producers to a partition, then some book > > > keeping > > > > on > > > > > > server side with regards to last msg committed from a > "co-relation > > > id" > > > > > (to > > > > > > identify unique producer) may be needed. > > > > > > > > > > > > > > > > > > Regards, > > > > > > Rohit > > > > > > > > > > > > > > > > > > On Sun, Oct 28, 2012 at 10:31 PM, Jun Rao <[email protected]> > > wrote: > > > > > > > > > > > >> If you use Kafka just as a redo log, you can't undo anything > > that's > > > > > written > > > > > >> to the log. Write-ahead logs in typical database systems are > both > > > redo > > > > > and > > > > > >> undo logs. Transaction commits and rollbacks are implemented on > > top > > > of > > > > > the > > > > > >> logs. However, general-purpose write-ahead logs for transactions > > are > > > > > much > > > > > >> more complicated. > > > > > >> > > > > > >> Thanks, > > > > > >> > > > > > >> Jun > > > > > >> > > > > > >> On Fri, Oct 26, 2012 at 11:08 AM, Jay Kreps < > [email protected]> > > > > > wrote: > > > > > >> > > > > > >> > This is an important feature and I am interested in helping > out > > in > > > > the > > > > > >> > design and implementation, though I am working on 0.8 features > > for > > > > the > > > > > >> next > > > > > >> > month so I may not be of too much use. I have thought a little > > bit > > > > > about > > > > > >> > this, but I am not yet sure of the best approach. > > > > > >> > > > > > > >> > Here is a specific use case I think is important to address: > > > > consider > > > > > a > > > > > >> > case where you are doing processing of one or more streams and > > > > > producing > > > > > >> an > > > > > >> > output stream. This processing may involve some kind of local > > > state > > > > > (say > > > > > >> > counters or other local aggregation intermediate state). This > > is a > > > > > common > > > > > >> > scenario. The problem is to give reasonable semantics to this > > > > > computation > > > > > >> > in the presence of failures. The processor effectively has a > > > > > >> > position/offset in each of its input streams as well as > whatever > > > > local > > > > > >> > state. The problem is that if this process fails it needs to > > > restore > > > > > to a > > > > > >> > state that matches the last produced messages. There are > several > > > > > >> solutions > > > > > >> > to this problem. One is to make the output somehow idempotent, > > > this > > > > > will > > > > > >> > solve some cases but is not a general solution as many things > > > cannot > > > > > be > > > > > >> > made idempotent easily. > > > > > >> > > > > > > >> > I think the two proposals you give outline a couple of basic > > > > > approaches: > > > > > >> > 1. Store the messages on the server somewhere but don't add > them > > > to > > > > > the > > > > > >> log > > > > > >> > until the commit call > > > > > >> > 2. Store the messages in the log but don't make them available > > to > > > > the > > > > > >> > consumer until the commit call > > > > > >> > Another option you didn't mention: > > > > > >> > > > > > > >> > I can give several subtleties to these approaches. > > > > > >> > > > > > > >> > One advantage of the second approach is that messages are in > the > > > log > > > > > and > > > > > >> > can be available for reading or not. This makes it possible to > > > > > support a > > > > > >> > kind of "dirty read" that allows the consumer to specify > whether > > > > they > > > > > >> want > > > > > >> > to immediately see all messages with low latency but > potentially > > > see > > > > > >> > uncommitted messages or only see committed messages. > > > > > >> > > > > > > >> > The problem with the second approach at least in the way you > > > > describe > > > > > it > > > > > >> is > > > > > >> > that you have to lock the log until the commit occurs > otherwise > > > you > > > > > can't > > > > > >> > roll back (because otherwise someone else may have appended > > their > > > > own > > > > > >> > messages and you can't truncate the log). This would have all > > the > > > > > >> problems > > > > > >> > of remote locks. I think this might be a deal-breaker. > > > > > >> > > > > > > >> > Another variation on the second approach would be the > following: > > > > have > > > > > >> each > > > > > >> > producer maintain an id and generation number. Keep a schedule > > of > > > > > valid > > > > > >> > offset/id/generation numbers on the broker and only hand these > > > out. > > > > > This > > > > > >> > solution would support non-blocking multi-writer appends but > > > > requires > > > > > >> more > > > > > >> > participation from the producer (i.e. getting a generation > > number > > > > and > > > > > >> id). > > > > > >> > > > > > > >> > Cheers, > > > > > >> > > > > > > >> > -Jay > > > > > >> > > > > > > >> > On Thu, Oct 25, 2012 at 7:04 PM, Tom Brown < > > [email protected]> > > > > > wrote: > > > > > >> > > > > > > >> > > I have come up with two different possibilities, both with > > > > different > > > > > >> > > trade-offs. > > > > > >> > > > > > > > >> > > The first would be to support "true" transactions by writing > > > > > >> > > transactional data into a temporary file and then copy it > > > directly > > > > > to > > > > > >> > > the end of the partition when the commit command is created. > > The > > > > > >> > > upside to this approach is that individual transactions can > be > > > > > larger > > > > > >> > > than a single batch, and more than one producer could > conduct > > > > > >> > > transactions at once. The downside is the extra IO involved > in > > > > > writing > > > > > >> > > it and reading it from disk an extra time. > > > > > >> > > > > > > > >> > > The second would be to allow any number of messages to be > > > appended > > > > > to > > > > > >> > > a topic, but not move the "end of topic" offset until the > > commit > > > > was > > > > > >> > > received. If a rollback was received, or the producer timed > > out, > > > > the > > > > > >> > > partition could be truncated at the most recently recognized > > > "end > > > > of > > > > > >> > > topic" offset. The upside is that there is very little extra > > IO > > > > > (only > > > > > >> > > to store the official "end of topic" metadata), and it seems > > > like > > > > it > > > > > >> > > should be easy to implement. The downside is that this the > > > > > >> > > "transaction" feature is incompatible with anything but a > > single > > > > > >> > > producer per partition. > > > > > >> > > > > > > > >> > > I am interested in your thoughts on these. > > > > > >> > > > > > > > >> > > --Tom > > > > > >> > > > > > > > >> > > On Thu, Oct 25, 2012 at 9:31 PM, Philip O'Toole < > > > > [email protected]> > > > > > >> > wrote: > > > > > >> > > > On Thu, Oct 25, 2012 at 06:19:04PM -0700, Neha Narkhede > > wrote: > > > > > >> > > >> The closest concept of transaction on the publisher side, > > > that > > > > I > > > > > can > > > > > >> > > >> think of, is using batch of messages in a single call to > > the > > > > > >> > > >> synchronous producer. > > > > > >> > > >> > > > > > >> > > >> Precisely, you can configure a Kafka producer to use the > > > "sync" > > > > > mode > > > > > >> > > >> and batch messages that require transactional guarantees > > in a > > > > > >> > > >> single send() call. That will ensure that either all the > > > > > messages in > > > > > >> > > >> the batch are sent or none. > > > > > >> > > > > > > > > >> > > > This is an interesting feature -- something I wasn't aware > > of. > > > > > Still > > > > > >> it > > > > > >> > > > doesn't solve the problem *completely*. As many people > > > realise, > > > > > it's > > > > > >> > > still > > > > > >> > > > possible for the batch of messages to get into Kafka fine, > > but > > > > the > > > > > >> ack > > > > > >> > > from > > > > > >> > > > Kafka to be lost on its way back to the Producer. In that > > case > > > > the > > > > > >> > > Producer > > > > > >> > > > erroneously believes the messages didn't get in, and might > > > > re-send > > > > > >> > them. > > > > > >> > > > > > > > > >> > > > You guys *haven't* solved that issue, right? I believe you > > > write > > > > > >> about > > > > > >> > > it on > > > > > >> > > > the Kafka site. > > > > > >> > > > > > > > > >> > > >> > > > > > >> > > >> Thanks, > > > > > >> > > >> Neha > > > > > >> > > >> > > > > > >> > > >> On Thu, Oct 25, 2012 at 2:44 PM, Tom Brown < > > > > [email protected] > > > > > > > > > > > >> > > wrote: > > > > > >> > > >> > Is there an accepted, or recommended way to make writes > > to > > > a > > > > > Kafka > > > > > >> > > >> > queue idempotent, or within a transaction? > > > > > >> > > >> > > > > > > >> > > >> > I can configure my system such that each queue has > > exactly > > > > one > > > > > >> > > producer. > > > > > >> > > >> > > > > > > >> > > >> > (If there are no accepted/recommended ways, I have a > few > > > > ideas > > > > > I > > > > > >> > would > > > > > >> > > >> > like to propose. I would also be willing to implement > > them > > > if > > > > > >> > needed) > > > > > >> > > >> > > > > > > >> > > >> > Thanks in advance! > > > > > >> > > >> > > > > > > >> > > >> > --Tom > > > > > >> > > > > > > > > >> > > > -- > > > > > >> > > > Philip O'Toole > > > > > >> > > > > > > > > >> > > > Senior Developer > > > > > >> > > > Loggly, Inc. > > > > > >> > > > San Francisco, Calif. > > > > > >> > > > www.loggly.com > > > > > >> > > > > > > > > >> > > > Come join us! > > > > > >> > > > http://loggly.com/company/careers/ > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > >
