Guozhang, Thanks. I'm thinking not of threads crashing but processes/vms/networks disappearing. Lights-out design so that if any of the computers/routers catch fire I can still sleep.
I think I can get what I want by spinning up N ZookeeperConsumerConnectors on the topic each with one thread rather than multiple threads on a single consumer. That eliminates the need for synchronization and gives me parallelism. Am I correct? Thanks again, C On Wed, Jan 29, 2014 at 4:00 PM, Guozhang Wang <[email protected]> wrote: > Hi Clark, > > 1. This is true, you need to synchronize these consumer threads when > calling commitOffsets(); > > 2. If you are asking what if the consumer thread crashed after > > currentTopicInfo.resetConsumeOffset(consumedOffset) > > within the next() call, then on its startup, it will lose all these > in-memory offsets, and read from the ZK which will be smaller than the > current value, still leading to duplicates but not data losses. > > Guozhang > > > On Wed, Jan 29, 2014 at 12:31 PM, Clark Breyman <[email protected]> wrote: > > > Guozhang, > > > > Thank make sense except for the following: > > > > - the ZookeeperConsumerConnector.commitOffsets() method commits the > current > > value of PartitionTopicInfo.consumeOffset for all of the active streams. > > > > - the ConsumerIterator in the streams advances the value of > > PartitionTopicInfo.consumeOffset *before* next() returns, not after the > > processing on that message is complete. > > > > If you have multiple threads consuming, thread A calling commitOffsets() > > may commit thread B's retrieved but unprocessed message, no? > > > > > > On Wed, Jan 29, 2014 at 12:20 PM, Guozhang Wang <[email protected]> > > wrote: > > > > > Hi Clark, > > > > > > In practice, the client app code need to always commit offset after it > > has > > > processed the messages, and hence only the second case may happen, > > leading > > > to "at least once". > > > > > > Guozhang > > > > > > > > > On Wed, Jan 29, 2014 at 11:51 AM, Clark Breyman <[email protected]> > > wrote: > > > > > > > Wrestling through the at-least/most-once semantics of my application > > and > > > I > > > > was hoping for some confirmation of the semantics. I'm not sure I can > > > > classify the high level consumer as either type. > > > > > > > > False ack scenario: > > > > - Thread A: call next() on the ConsumerIterator, advancing the > > > > PartitionTopicInfo offset > > > > - Thread B: commitOffsets() flushed offset of incomplete message to > ZK > > > > - Thread A: fail processing (e.g. kill -9) > > > > > > > > False retry scenario: > > > > - Thread A: call next() & successfully process, kill -9 before > > > > commitOffsets either in thread or in parallel. > > > > > > > > Is this right or am I missing something (likely)? Seems like the > > > semantics > > > > are essentially approximately once. > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > -- > -- Guozhang >
