Awesome!  Thanks for confirmation and continued great work on Kafka!

On Thu, Mar 28, 2013 at 9:22 AM, Jun Rao <[email protected]> wrote:

> Jonathan,
>
> With a single writer, the producer can achieve exact once write. If a send
> request fails, the producer first checks the end of the log to see if the
> previous write succeeded or not. The producer will only resend if the
> previous write fails.
>
> To do this, the producer needs the offset of appended messages. In 0.8,
> such offsets are not returned in our high level producer API yet. We plan
> to extend our producer API post 0.8 to expose this information.
>
> Thanks,
>
> Jun
>
> On Wed, Mar 27, 2013 at 2:41 PM, Jonathan Hodges <[email protected]>
> wrote:
>
> > I know this is a really old thread, but it looked like the only pertinent
> > one that came up when searching for ‘exactly once’ in the archives.  I
> just
> > want to confirm my understanding of the 0.8 version in that it still
> > doesn’t completely support exactly once semantics.  With the producer
> > configured in sync mode and quorum commits there are still some edge case
> > failure modes where the producer won’t receive the ack and resends the
> > message(s).  I think I read that the consumers don’t see uncommitted
> > messages in the log, but I don’t think that addresses this producer case.
> > Please correct me if I am missing something here.
> >
> >
> > Don’t get me wrong we are very thankful for the 0.8 features.  It offers
> by
> > far the best message delivery guarantees out of the products we evaluated
> > like Rabbit and ActiveMQ.  We attempt to make are downstream consumer
> > processes idempotent to mitigate this edge case, but it isn’t always
> > feasible.  Also the suggestion by Milind in this thread of using Storm
> for
> > exactly once guarantees makes a lot of sense.  Trident State seems to
> > address this very issue (
> > https://github.com/nathanmarz/storm/wiki/Trident-state) so we could just
> > have it mediate our topics that required exactly once.
> >
> >
> > -Jonathan
> >
> >
> >
> > On Sat, Nov 3, 2012 at 1:53 PM, Milind Parikh <[email protected]
> > >wrote:
> >
> > > Why wouldn't the storm approach provide semantics of exactly once
> > > delivery? https://github.com/nathanmarz/storm
> > >
> > > Nathan actually credits the Kafka_devs for the basic idea of
> transaction
> > > persisting in one of his talks.
> > >
> > > Regards
> > > Milind
> > >
> > > On Nov 3, 2012 11:51 AM, "Rohit Prasad" <[email protected]>
> > wrote:
> > >
> > > > I agree that this approach only prevents duplicate messages to
> > partition
> > > > from the Producer side. There needs to be a similar approach on the
> > > > consumer side too. Using Zk can be one solution, or other non-ZK
> > > > approaches.
> > > >
> > > > Even if Consumer reads none or all messages of a transaction. But
> that
> > > does
> > > > not solve the transaction problem yet. Because the
> business/application
> > > > logic inside the Consumer thread may execute partially and fail. So
> it
> > > > becomes tricky to decide the point when you want to say that you have
> > > > "consumed" the message and increase consumption offset. If your
> > consumer
> > > > thread is saving some value  into DB/HDFS/etc, ideally you want this
> > save
> > > > operation and consumption offset to be incremented atomically. Thats
> > why
> > > it
> > > > boils down to Application logic implementing transactions and dealing
> > > with
> > > > duplicates.
> > > > Maybe a journalling or redo log approach on Consumer side can help
> > build
> > > > such a system.
> > > >
> > > > It will be nice if eventually kafka can be a transport which provides
> > > > "exactly once" semantics for message delivery. Then consumer threads
> > can
> > > be
> > > > sure that they receive messages once, and can build appln logic on
> top
> > of
> > > > that.
> > > >
> > > > I have a use case similar to what Jay mentioned in a previous mail. I
> > > want
> > > > to do aggregation but want the aggregated data to be correct,
> possible
> > > > avoiding duplicates incase of failures/crashes.
> > > >
> > > >
> > > >
> > > > On Fri, Nov 2, 2012 at 4:12 PM, Tom Brown <[email protected]>
> > wrote:
> > > >
> > > > > That approach allows a producer to prevent duplicate messages to
> the
> > > > > partition, but what about the consumer? In my case, I don't want
> the
> > > > > consumer to be able to read any of the messages unless it can read
> > all
> > > > > of the messages from a transaction.
> > > > >
> > > > > I also like the idea of there being multiple types of Kafka
> > > > > transaction, though, just to accommodate different performance,
> > > > > reliability, and consumption patterns. Of course, the added
> > complexity
> > > > > of that might just sink the whole thing.
> > > > >
> > > > > --Tom
> > > > >
> > > > > On Fri, Nov 2, 2012 at 4:11 PM, Rohit Prasad <
> > [email protected]
> > > >
> > > > > wrote:
> > > > > > Getting transactional support is quite hard problem. There will
> > > always
> > > > be
> > > > > > corner cases where the solution will not work, unless you want to
> > go
> > > > down
> > > > > > the path of 2PC, paxos, etc which ofcourse will degrade kafka's
> > > > > > performance. It is best to reconcile data and deal with duplicate
> > > > > messages
> > > > > > in Application layer. Having said that it would be amazing if we
> > can
> > > > > build
> > > > > > "at most once" semantics in Kafka!!
> > > > > >
> > > > > > Regarding above approaches,
> > > > > > The producer will always have a doubt if its commit went through.
> > > i.e.
> > > > if
> > > > > > the ack for "commit" is not received by the producer. Or If
> > producer
> > > > dies
> > > > > > immediately after calling the commit. When it is restarted how
> does
> > > it
> > > > > know
> > > > > > if last operation went through?
> > > > > >
> > > > > > I suggest the following -
> > > > > > 1. Producer should attach a timestamp at the beginning of each
> > > message
> > > > > and
> > > > > > send it to Server.
> > > > > > 2. On restarts/timeouts/re-connections, the producer should first
> > > read
> > > > > the
> > > > > > last committed message from the leader of the partition.
> > > > > > 3. From timestamp, it can know how many messages went through
> > before
> > > it
> > > > > > died (or connection was broken). And it can infer how many
> messages
> > > to
> > > > > > replay.
> > > > > >
> > > > > > The above approach can be used with existing Kafka libraries
> since
> > > you
> > > > > can
> > > > > > have a producer and consumer thread together in an application to
> > > > > implement
> > > > > > this logic. Or someone can take the initiative to write a
> > > Transactional
> > > > > > producer (which internally has both producer and a consumer to
> read
> > > > last
> > > > > > committed message.) I will be developing one for kafka 0.8 in
> c++.
> > > > > >
> > > > > > The above approach will work even if you batch messages for a
> > single
> > > > > > partition.
> > > > > > The above approach will work only if a single producer is writing
> > to
> > > a
> > > > > > partition. I want hear opinions about the above approach. I sure
> > > there
> > > > > can
> > > > > > be corner-cases where it may break.
> > > > > >
> > > > > > If there are multiple producers to a partition, then some book
> > > keeping
> > > > on
> > > > > > server side with regards to last msg committed from a
> "co-relation
> > > id"
> > > > > (to
> > > > > > identify unique producer) may be needed.
> > > > > >
> > > > > >
> > > > > > Regards,
> > > > > > Rohit
> > > > > >
> > > > > >
> > > > > > On Sun, Oct 28, 2012 at 10:31 PM, Jun Rao <[email protected]>
> > wrote:
> > > > > >
> > > > > >> If you use Kafka just as a redo log, you can't undo anything
> > that's
> > > > > written
> > > > > >> to the log. Write-ahead logs in typical database systems are
> both
> > > redo
> > > > > and
> > > > > >> undo logs. Transaction commits and rollbacks are implemented on
> > top
> > > of
> > > > > the
> > > > > >> logs. However, general-purpose write-ahead logs for transactions
> > are
> > > > > much
> > > > > >> more complicated.
> > > > > >>
> > > > > >> Thanks,
> > > > > >>
> > > > > >> Jun
> > > > > >>
> > > > > >> On Fri, Oct 26, 2012 at 11:08 AM, Jay Kreps <
> [email protected]>
> > > > > wrote:
> > > > > >>
> > > > > >> > This is an important feature and I am interested in helping
> out
> > in
> > > > the
> > > > > >> > design and implementation, though I am working on 0.8 features
> > for
> > > > the
> > > > > >> next
> > > > > >> > month so I may not be of too much use. I have thought a little
> > bit
> > > > > about
> > > > > >> > this, but I am not yet sure of the best approach.
> > > > > >> >
> > > > > >> > Here is a specific use case I think is important to address:
> > > > consider
> > > > > a
> > > > > >> > case where you are doing processing of one or more streams and
> > > > > producing
> > > > > >> an
> > > > > >> > output stream. This processing may involve some kind of local
> > > state
> > > > > (say
> > > > > >> > counters or other local aggregation intermediate state). This
> > is a
> > > > > common
> > > > > >> > scenario. The problem is to give reasonable semantics to this
> > > > > computation
> > > > > >> > in the presence of failures. The processor effectively has a
> > > > > >> > position/offset in each of its input streams as well as
> whatever
> > > > local
> > > > > >> > state. The problem is that if this process fails it needs to
> > > restore
> > > > > to a
> > > > > >> > state that matches the last produced messages. There are
> several
> > > > > >> solutions
> > > > > >> > to this problem. One is to make the output somehow idempotent,
> > > this
> > > > > will
> > > > > >> > solve some cases but is not a general solution as many things
> > > cannot
> > > > > be
> > > > > >> > made idempotent easily.
> > > > > >> >
> > > > > >> > I think the two proposals you give outline a couple of basic
> > > > > approaches:
> > > > > >> > 1. Store the messages on the server somewhere but don't add
> them
> > > to
> > > > > the
> > > > > >> log
> > > > > >> > until the commit call
> > > > > >> > 2. Store the messages in the log but don't make them available
> > to
> > > > the
> > > > > >> > consumer until the commit call
> > > > > >> > Another option you didn't mention:
> > > > > >> >
> > > > > >> > I can give several subtleties to these approaches.
> > > > > >> >
> > > > > >> > One advantage of the second approach is that messages are in
> the
> > > log
> > > > > and
> > > > > >> > can be available for reading or not. This makes it possible to
> > > > > support a
> > > > > >> > kind of "dirty read" that allows the consumer to specify
> whether
> > > > they
> > > > > >> want
> > > > > >> > to immediately see all messages with low latency but
> potentially
> > > see
> > > > > >> > uncommitted messages or only see committed messages.
> > > > > >> >
> > > > > >> > The problem with the second approach at least in the way you
> > > > describe
> > > > > it
> > > > > >> is
> > > > > >> > that you have to lock the log until the commit occurs
> otherwise
> > > you
> > > > > can't
> > > > > >> > roll back (because otherwise someone else may have appended
> > their
> > > > own
> > > > > >> > messages and you can't truncate the log). This would have all
> > the
> > > > > >> problems
> > > > > >> > of remote locks. I think this might be a deal-breaker.
> > > > > >> >
> > > > > >> > Another variation on the second approach would be the
> following:
> > > > have
> > > > > >> each
> > > > > >> > producer maintain an id and generation number. Keep a schedule
> > of
> > > > > valid
> > > > > >> > offset/id/generation numbers on the broker and only hand these
> > > out.
> > > > > This
> > > > > >> > solution would support non-blocking multi-writer appends but
> > > > requires
> > > > > >> more
> > > > > >> > participation from the producer (i.e. getting a generation
> > number
> > > > and
> > > > > >> id).
> > > > > >> >
> > > > > >> > Cheers,
> > > > > >> >
> > > > > >> > -Jay
> > > > > >> >
> > > > > >> > On Thu, Oct 25, 2012 at 7:04 PM, Tom Brown <
> > [email protected]>
> > > > > wrote:
> > > > > >> >
> > > > > >> > > I have come up with two different possibilities, both with
> > > > different
> > > > > >> > > trade-offs.
> > > > > >> > >
> > > > > >> > > The first would be to support "true" transactions by writing
> > > > > >> > > transactional data into a temporary file and then copy it
> > > directly
> > > > > to
> > > > > >> > > the end of the partition when the commit command is created.
> > The
> > > > > >> > > upside to this approach is that individual transactions can
> be
> > > > > larger
> > > > > >> > > than a single batch, and more than one producer could
> conduct
> > > > > >> > > transactions at once. The downside is the extra IO involved
> in
> > > > > writing
> > > > > >> > > it and reading it from disk an extra time.
> > > > > >> > >
> > > > > >> > > The second would be to allow any number of messages to be
> > > appended
> > > > > to
> > > > > >> > > a topic, but not move the "end of topic" offset until the
> > commit
> > > > was
> > > > > >> > > received. If a rollback was received, or the producer timed
> > out,
> > > > the
> > > > > >> > > partition could be truncated at the most recently recognized
> > > "end
> > > > of
> > > > > >> > > topic" offset. The upside is that there is very little extra
> > IO
> > > > > (only
> > > > > >> > > to store the official "end of topic" metadata), and it seems
> > > like
> > > > it
> > > > > >> > > should be easy to implement. The downside is that this the
> > > > > >> > > "transaction" feature is incompatible with anything but a
> > single
> > > > > >> > > producer per partition.
> > > > > >> > >
> > > > > >> > > I am interested in your thoughts on these.
> > > > > >> > >
> > > > > >> > > --Tom
> > > > > >> > >
> > > > > >> > > On Thu, Oct 25, 2012 at 9:31 PM, Philip O'Toole <
> > > > [email protected]>
> > > > > >> > wrote:
> > > > > >> > > > On Thu, Oct 25, 2012 at 06:19:04PM -0700, Neha Narkhede
> > wrote:
> > > > > >> > > >> The closest concept of transaction on the publisher side,
> > > that
> > > > I
> > > > > can
> > > > > >> > > >> think of, is using batch of messages in a single call to
> > the
> > > > > >> > > >> synchronous producer.
> > > > > >> > > >>
> > > > > >> > > >> Precisely, you can configure a Kafka producer to use the
> > > "sync"
> > > > > mode
> > > > > >> > > >> and batch messages that require transactional guarantees
> > in a
> > > > > >> > > >> single send() call. That will ensure that either all the
> > > > > messages in
> > > > > >> > > >> the batch are sent or none.
> > > > > >> > > >
> > > > > >> > > > This is an interesting feature -- something I wasn't aware
> > of.
> > > > > Still
> > > > > >> it
> > > > > >> > > > doesn't solve the problem *completely*. As many people
> > > realise,
> > > > > it's
> > > > > >> > > still
> > > > > >> > > > possible for the batch of messages to get into Kafka fine,
> > but
> > > > the
> > > > > >> ack
> > > > > >> > > from
> > > > > >> > > > Kafka to be lost on its way back to the Producer. In that
> > case
> > > > the
> > > > > >> > > Producer
> > > > > >> > > > erroneously believes the messages didn't get in, and might
> > > > re-send
> > > > > >> > them.
> > > > > >> > > >
> > > > > >> > > > You guys *haven't* solved that issue, right? I believe you
> > > write
> > > > > >> about
> > > > > >> > > it on
> > > > > >> > > > the Kafka site.
> > > > > >> > > >
> > > > > >> > > >>
> > > > > >> > > >> Thanks,
> > > > > >> > > >> Neha
> > > > > >> > > >>
> > > > > >> > > >> On Thu, Oct 25, 2012 at 2:44 PM, Tom Brown <
> > > > [email protected]
> > > > > >
> > > > > >> > > wrote:
> > > > > >> > > >> > Is there an accepted, or recommended way to make writes
> > to
> > > a
> > > > > Kafka
> > > > > >> > > >> > queue idempotent, or within a transaction?
> > > > > >> > > >> >
> > > > > >> > > >> > I can configure my system such that each queue has
> > exactly
> > > > one
> > > > > >> > > producer.
> > > > > >> > > >> >
> > > > > >> > > >> > (If there are no accepted/recommended ways, I have a
> few
> > > > ideas
> > > > > I
> > > > > >> > would
> > > > > >> > > >> > like to propose. I would also be willing to implement
> > them
> > > if
> > > > > >> > needed)
> > > > > >> > > >> >
> > > > > >> > > >> > Thanks in advance!
> > > > > >> > > >> >
> > > > > >> > > >> > --Tom
> > > > > >> > > >
> > > > > >> > > > --
> > > > > >> > > > Philip O'Toole
> > > > > >> > > >
> > > > > >> > > > Senior Developer
> > > > > >> > > > Loggly, Inc.
> > > > > >> > > > San Francisco, Calif.
> > > > > >> > > > www.loggly.com
> > > > > >> > > >
> > > > > >> > > > Come join us!
> > > > > >> > > > http://loggly.com/company/careers/
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
>

Reply via email to