Hi Edward,
I think you misunderstand ... I definitely do *not* want to commit
every message. That would be much too expensive, every few minutes is
plenty for me.
I want to guarantee that if I commit a message, that my message has
been processed by *my application* at least once. (In another thread
I mentioned something about reading exactly once, but that is not what
I am going for here -- more than once is OK, as long as its at least
once.) That requires me to make sure that a commit never happens
between a call to next() and doSomething(). Kafka guarantees that
messages get read off the queue at least once, but as I outlined
above, a naive solution allows for messages to get read off the queue,
but not make it into my app.
Not only do I not want to commit every message -- I'd really like to
have the above guarantee without even acquiring a lock (most of the
time). That is what I was getting at with the comment about the
spin-lock, just as an idea of how to prevent a commit between next()
and doSomething(). I dunno if the spin-lock was really the right
idea, that was just a random thought, but the point is, we want some
check that should be very cheap for 99.999% of the time when a commit
isn't happening, but can still guarantee proper ordering during that
0.001% of the time when a commit is happening. otherwise, messages
might never make it to my app. (And this is all just to prevent a
message getting lost during the 10^-6% of the time when a commit might
happen between next() and doSomething(), and the app dies before
doSomething completes!)
Actually after a bit more thought -- the best way to guarantee this
would be with a small api change, that would do away with the need for
locks completely. The iterator should have a method applyToNext:
def applyToNext(f: MessageAndData[K,V] => Unit) {
//all the stuff in next, *except*
currentTopicInfo.resetConsumeOffset(consumedOffset)
val item = ...
f(item)
//after we've applied the users function, now we can update the
offset that should get committed
currentTopicInfo.resetConsumeOffset(consumedOffset)
}
I think this way, you could avoid any problems even with auto-commit.
Or, if you don't want to add a new method, so we can stick to the
Iterator api, then maybe the iterator could let you register a
preCommitFunction, so next() would change to:
override def next(): MessageAndMetadata[K, V] = {
...
val item = ...
preCommitFuntions.foreach{f => f(item)}
*currentTopicInfo.resetConsumeOffset(consumedOffset)*
...
item
}
thanks,
Imran
On Thu, Nov 21, 2013 at 6:55 PM, Edward Capriolo <[email protected]> wrote:
> You likely need to use a custom offset solution if you plan on committing
> every message. With many partitions this puts a large burden on Zookeeper,
> you end up needing to roll over your zk transaction logs fast as well or
> rist filling up disk
>
>
> On Thu, Nov 21, 2013 at 6:20 PM, Guozhang Wang <[email protected]> wrote:
>
>> Hello Imran,
>>
>> The offset will only be updated when the next() function is called:
>>
>> override def next(): MessageAndMetadata[K, V] = {
>> ...
>> *currentTopicInfo.resetConsumeOffset(consumedOffset)*
>> ...
>> item
>> }
>>
>> instead of in makeNext(), which will just update consumedOffset, but that
>> is not the value that will be committed using the commitOffset call. So as
>> long as you turn of auto commit and only call commitOffset after the
>> process(msg) call, not after the
>>
>> b = iter.next()
>>
>> is called, at-least-once is guaranteed.
>>
>> Does that make sense?
>>
>> Guozhang
>>
>>
>> On Thu, Nov 21, 2013 at 2:14 PM, Imran Rashid <[email protected]>
>> wrote:
>>
>> > sorry to keep bugging the list, but I feel like I am either missing
>> > something important, or I'm finding something wrong w/ the standard
>> > consumer api, (or maybe just the docs need some clarification).
>> >
>> > I started to think that I should probably just accept at least once
>> > semantics ... but I eventually realized that I'm not even sure we
>> > really get an at least once guarantee. I think it really might be
>> > zero-or-more. Or rather, messages will get pulled off the kafka queue
>> > at least once. but that doesn't mean your app will actually *process*
>> > those messages at least once -- there might be messages it never
>> > processes.
>> >
>> > Consider a really basic reader of a kafka queue:
>> >
>> > while(it.hasNext()){
>> > val msg = it.next()
>> > doSomething(msg)
>> > }
>> >
>> > the question is, do I have any guarantees on how many times
>> > doSomething() is called on everything in the queue? I think the
>> > "guarantee" is:
>> > 1) most messages will get processed excatly once
>> > 2) around a restart, a chunk of msgs will get processed at least once,
>> > but probably more than once
>> > 3) around a restart, it is possible that one message will get
>> > processed ZERO times
>> >
>> > (1) & (2) are probably clear, so lemme explain how I think (3) could
>> > happen. Lets imagine messages a,b,c,... and two threads, one reading
>> > from the stream, and one thread that periodically commits the offsets.
>> > Imagine this sequence of events:
>> >
>> >
>> > ==Reader==
>> > -initializes w/ offset pointing to "a"
>> >
>> > -hasNext()
>> > ---> makeNext() will read "a"
>> > and update the local offset to "b"
>> >
>> > -msg = "a"
>> >
>> > -doSomething("a")
>> >
>> > -hasNext()
>> > ----> makeNext() will read "b"
>> > and update the local offset "c"
>> >
>> > ==Commiter==
>> >
>> > -commitOffsets stores the current offset as "c"
>> >
>> >
>> >
>> > =====PROCESS DIES=====
>> > ===== RESTARTS =====
>> >
>> > ==Reader==
>> > -initializes w/ offset pointing to "c"
>> >
>> > -hasNext()
>> > --> makeNext() will read "c"
>> > and update local offset to "d"
>> > -msg = "c"
>> > -doSomething("c")
>> > ...
>> >
>> >
>> >
>> > note that in this scenario, doSomething("b") was never called.
>> > Probably for a lot of applications this doesn't matter. But seems
>> > like it this could be terrible for some apps. I can't think of any
>> > way of preventing it from user code. unless, maybe when the offsets
>> > get committed, it is always *before* the last thing read? eg., in my
>> > example, it would store the next offset as "b" or earlier?
>> >
>> > Is there a flaw in my logic? Do committed offsets always "undershoot"
>> > to prevent this?
>> >
>> > thanks,
>> > Imran
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>